Merge branch '3.0' of https://github.com/taosdata/TDengine into test/3.0/TD-24125

This commit is contained in:
chenhaoran 2023-10-12 18:02:37 +08:00
commit dd8f81d9aa
32 changed files with 2902 additions and 3910 deletions

View File

@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] select_list
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
hint:
BATCH_SCAN | NO_BATCH_SCAN
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP
select_list:
select_expr [, select_expr] ...
@ -88,14 +88,16 @@ Hints are a means of user control over query optimization for individual stateme
The list of currently supported Hints is as follows:
| **Hint** | **Params** | **Comment** | **Scopt** |
| :-----------: | -------------- | -------------------------- | -------------------------- |
| :-----------: | -------------- | -------------------------- | -----------------------------------|
| BATCH_SCAN | None | Batch table scan | JOIN statment for stable |
| NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable |
| SORT_FOR_GROUP| None | Use sort for partition | With normal column in partition by list |
For example:
```sql
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
```
## Lists

View File

@ -54,6 +54,7 @@ LIKE is used together with wildcards to match strings. Its usage is described as
MATCH and NMATCH are used together with regular expressions to match strings. Their usage is described as follows:
- Use POSIX regular expression syntax. For more information, see Regular Expressions.
- The `MATCH` operator returns true when the regular expression is matched. The `NMATCH` operator returns true when the regular expression is not matched.
- Regular expression can be used against only table names, i.e. `tbname`, and tags/columns of binary/nchar types.
- The maximum length of regular expression string is 128 bytes. Configuration parameter `maxRegexStringLen` can be used to set the maximum allowed regular expression. It's a configuration parameter on the client side, and will take effect after restarting the client.

View File

@ -73,10 +73,10 @@ Shows the SQL statement used to create the specified table. This statement can b
## SHOW DATABASES
```sql
SHOW DATABASES;
SHOW [USER | SYSTEM] DATABASES;
```
Shows all user-created databases.
Shows all databases. The `USER` qualifier specifies only user-created databases. The `SYSTEM` qualifier specifies only system databases.
## SHOW DNODES
@ -183,10 +183,10 @@ Shows all subscriptions in the system.
## SHOW TABLES
```sql
SHOW [db_name.]TABLES [LIKE 'pattern'];
SHOW [NORMAL | CHILD] [db_name.]TABLES [LIKE 'pattern'];
```
Shows all standard tables and subtables in the current database. You can use LIKE for fuzzy matching.
Shows all standard tables and subtables in the current database. You can use LIKE for fuzzy matching. The `Normal` qualifier specifies standard tables. The `CHILD` qualifier specifies subtables.
## SHOW TABLE DISTRIBUTED

View File

@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] select_list
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
hint:
BATCH_SCAN | NO_BATCH_SCAN
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP
select_list:
select_expr [, select_expr] ...
@ -88,14 +88,16 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适
目前支持的 Hints 列表如下:
| **Hint** | **参数** | **说明** | **适用范围** |
| :-----------: | -------------- | -------------------------- | -------------------------- |
| :-----------: | -------------- | -------------------------- | -----------------------------|
| BATCH_SCAN | 无 | 采用批量读表的方式 | 超级表 JOIN 语句 |
| NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 |
| SORT_FOR_GROUP| 无 | 采用sort方式进行分组 | partition by 列表有普通列时 |
举例:
```sql
SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts;
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
```
## 列表

View File

@ -54,6 +54,7 @@ LIKE 条件使用通配符字符串进行匹配检查,规则如下:
MATCH 条件和 NMATCH 条件使用正则表达式进行匹配,规则如下:
- 支持符合 POSIX 规范的正则表达式,具体规范内容可参见 Regular Expressions。
- MATCH 和正则表达式匹配时, 返回 TURE. NMATCH 和正则表达式不匹配时, 返回 TRUE.
- 只能针对子表名(即 tbname、字符串类型的标签值进行正则表达式过滤不支持普通列的过滤。
- 正则匹配字符串长度不能超过 128 字节。可以通过参数 maxRegexStringLen 设置和调整最大允许的正则匹配字符串,该参数是客户端配置参数,需要重启客户端才能生效

View File

@ -73,10 +73,10 @@ SHOW CREATE TABLE [db_name.]tb_name
## SHOW DATABASES
```sql
SHOW DATABASES;
SHOW [USER | SYSTEM] DATABASES;
```
显示用户定义的所有数据库。
显示定义的所有数据库。SYSTEM 指定只显示系统数据库。USER 指定只显示用户创建的数据库。
## SHOW DNODES
@ -183,10 +183,10 @@ SHOW SUBSCRIPTIONS;
## SHOW TABLES
```sql
SHOW [db_name.]TABLES [LIKE 'pattern'];
SHOW [NORMAL | CHILD] [db_name.]TABLES [LIKE 'pattern'];
```
显示当前数据库下的所有普通表和子表的信息。可以使用 LIKE 对表名进行模糊匹配。
显示当前数据库下的所有普通表和子表的信息。可以使用 LIKE 对表名进行模糊匹配。NORMAL 指定只显示普通表信息, CHILD 指定只显示子表信息。
## SHOW TABLE DISTRIBUTED

View File

@ -51,13 +51,11 @@ typedef enum {
} EGrantType;
int32_t grantCheck(EGrantType grant);
#ifdef TD_ENTERPRISE
#ifndef TD_GRANT_OPTIMIZE
int32_t grantAlterActiveCode(const char* old, const char* new, char* out, int8_t type);
#else
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* new, char* out, int8_t type);
#endif
#endif
#ifndef GRANTS_CFG
#ifdef TD_ENTERPRISE

View File

@ -362,6 +362,8 @@
#define TK_WAL 343
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602

View File

@ -507,6 +507,7 @@ typedef struct SBalanceVgroupStmt {
typedef struct SBalanceVgroupLeaderStmt {
ENodeType type;
int32_t vgId;
} SBalanceVgroupLeaderStmt;
typedef struct SMergeVgroupStmt {

View File

@ -382,6 +382,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_STT_TRIGGER 1
#define TSDB_DEFAULT_SST_TRIGGER 1
#endif
#define TSDB_STT_TRIGGER_ARRAY_SIZE 16 // maximum of TSDB_MAX_STT_TRIGGER of TD_ENTERPRISE and TD_COMMUNITY
#define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN)
#define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2)
#define TSDB_DEFAULT_HASH_PREFIX 0

View File

@ -157,6 +157,10 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
pInst = &p;
} else {
ASSERTS((*pInst) && (*pInst)->pAppHbMgr, "*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
// reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
}
taosThreadMutexUnlock(&appInfo.mutex);

View File

@ -544,7 +544,9 @@ STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
if (pTrans == NULL) {
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
} else {
#ifdef WINDOWS
taosThreadMutexInit(&pTrans->mutex, NULL);
#endif
}
return pTrans;
}

View File

@ -625,7 +625,7 @@ struct SDFileSet {
SDataFile *pDataF;
SSmaFile *pSmaF;
uint8_t nSttF;
SSttFile *aSttF[TSDB_MAX_STT_TRIGGER];
SSttFile *aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE];
};
struct STSDBRowIter {
@ -694,7 +694,7 @@ struct SDataFWriter {
SHeadFile fHead;
SDataFile fData;
SSmaFile fSma;
SSttFile fStt[TSDB_MAX_STT_TRIGGER];
SSttFile fStt[TSDB_STT_TRIGGER_ARRAY_SIZE];
uint8_t *aBuf[4];
};
@ -705,7 +705,7 @@ struct SDataFReader {
STsdbFD *pHeadFD;
STsdbFD *pDataFD;
STsdbFD *pSmaFD;
STsdbFD *aSttFD[TSDB_MAX_STT_TRIGGER];
STsdbFD *aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE];
uint8_t *aBuf[3];
};

View File

@ -268,6 +268,8 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
}
void tqReaderClose(STqReader* pReader) {
if (pReader == NULL) return;
// close wal reader
if (pReader->pWalReader) {
walCloseReader(pReader->pWalReader);

View File

@ -69,7 +69,7 @@ typedef struct {
SDataIter *pIter;
SRBTree rbt;
SDataIter dataIter;
SDataIter aDataIter[TSDB_MAX_STT_TRIGGER];
SDataIter aDataIter[TSDB_STT_TRIGGER_ARRAY_SIZE];
int8_t toLastOnly;
};
struct {
@ -865,7 +865,7 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
TSDB_CHECK_CODE(code, lino, _exit);
// merger
for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) {
SDataIter *pIter = &pCommitter->aDataIter[iStt];
pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pIter->aSttBlk == NULL) {
@ -915,7 +915,7 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
tBlockDataDestroy(&pCommitter->dReader.bData);
// merger
for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) {
SDataIter *pIter = &pCommitter->aDataIter[iStt];
taosArrayDestroy(pIter->aSttBlk);
tBlockDataDestroy(&pIter->bData);

View File

@ -1000,7 +1000,7 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
tsdbCloseFile(&(*ppReader)->pSmaFD);
// stt
for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
for (int32_t iStt = 0; iStt < TSDB_STT_TRIGGER_ARRAY_SIZE; iStt++) {
if ((*ppReader)->aSttFD[iStt]) {
tsdbCloseFile(&(*ppReader)->aSttFD[iStt]);
}

View File

@ -244,7 +244,7 @@ SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, boo
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt);
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt);
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId);
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2);
SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes);
SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId);

View File

@ -691,11 +691,16 @@ cmd ::= KILL TRANSACTION NK_INTEGER(A).
/************************************************ merge/redistribute/ vgroup ******************************************/
cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); }
cmd ::= BALANCE VGROUP LEADER. { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt); }
cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); }
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }
cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); }
cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
%type on_vgroup_id { SToken }
%destructor on_vgroup_id { }
on_vgroup_id(A) ::= . { A = nil_token; }
on_vgroup_id(A) ::= ON NK_INTEGER(B). { A = B; }
%type dnode_list { SNodeList* }
%destructor dnode_list { nodesDestroyList($$); }
dnode_list(A) ::= DNODE NK_INTEGER(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B)); }

View File

@ -2307,10 +2307,13 @@ SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt) {
return (SNode*)pStmt;
}
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt) {
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId) {
CHECK_PARSER_STATUS(pCxt);
SBalanceVgroupLeaderStmt* pStmt = (SBalanceVgroupLeaderStmt*)nodesMakeNode(QUERY_NODE_BALANCE_VGROUP_LEADER_STMT);
CHECK_OUT_OF_MEM(pStmt);
if (NULL != pVgId && NULL != pVgId->z) {
pStmt->vgId = taosStr2Int32(pVgId->z, NULL, 10);
}
return (SNode*)pStmt;
}

View File

@ -7439,6 +7439,7 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm
static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) {
SBalanceVgroupLeaderReq req = {0};
req.vgId = pStmt->vgId;
return buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP_LEADER, (FSerializeFunc)tSerializeSBalanceVgroupLeaderReq, &req);
}

File diff suppressed because it is too large Load Diff

View File

@ -3646,15 +3646,15 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
}
static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SNode* node;
int32_t code = TSDB_CODE_SUCCESS;
SPartitionLogicNode* pNode = (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized);
SPartitionLogicNode* pNode =
(SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized);
if (NULL == pNode) return TSDB_CODE_SUCCESS;
SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pNode);
if (!pRootNode->pHint || !getSortForGroupOptHint(pRootNode->pHint)) {
return code;
}
if (pRootNode->pHint && getSortForGroupOptHint(pRootNode->pHint)) {
// replace with sort node
SSortLogicNode* pSort = partColOptCreateSort(pNode);
if (!pSort) {
@ -3672,6 +3672,47 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
nodesDestroyNode((SNode*)pSort);
}
}
return code;
} else if (pNode->node.pParent && nodeType(pNode->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) {
// Check if we can delete partition node
SAggLogicNode* pAgg = (SAggLogicNode*)pNode->node.pParent;
FOREACH(node, pNode->pPartitionKeys) {
SGroupingSetNode* pgsNode = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
if (!pgsNode) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
pgsNode->groupingSetType = GP_TYPE_NORMAL;
pgsNode->pParameterList = nodesMakeList();
if (!pgsNode->pParameterList) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListAppend(pgsNode->pParameterList, nodesCloneNode(node));
}
if (code == TSDB_CODE_SUCCESS) {
// Now we are using hash agg
code = nodesListMakeAppend(&pAgg->pGroupKeys, (SNode*)pgsNode);
}
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pgsNode);
break;
}
}
if (code == TSDB_CODE_SUCCESS) {
code =
replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)nodesListGetNode(pNode->node.pChildren, 0));
NODES_CLEAR_LIST(pNode->node.pChildren);
}
if (code == TSDB_CODE_SUCCESS) {
// For hash agg, nonblocking mode is meaningless, slimit is useless, so we reset it
pAgg->node.forceCreateNonBlockingOptr = false;
nodesDestroyNode(pAgg->node.pSlimit);
pAgg->node.pSlimit = NULL;
nodesDestroyNode((SNode*)pNode);
pCxt->optimized = true;
}
return code;
}
return code;
}

View File

@ -65,11 +65,11 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState);
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
// func cf
@ -84,10 +84,14 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState);
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur);
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen);
int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,

View File

@ -1970,7 +1970,7 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
memset(*pVal, 0, size);
return 0;
}
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
@ -2052,7 +2052,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
return NULL;
}
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
qDebug("streamStateGetCur_rocksdb");
int32_t code = 0;
@ -2062,9 +2062,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
return NULL;
}
char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
{
char tbuf[256] = {0};
stateKeyToString((void*)&maxStateKey, tbuf);
@ -2079,6 +2076,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
@ -2183,6 +2182,52 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
return code;
}
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
qDebug("streamStateSessionSeekToLast_rocksdb");
int32_t code = 0;
SSessionKey maxSessionKey = {.groupId = UINT64_MAX, .win = {.skey = INT64_MAX, .ekey = INT64_MAX}};
SStateSessionKey maxKey = {.key = maxSessionKey, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &maxKey, "", 0);
if (code != 0) {
return NULL;
}
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = createStreamStateCursor();
pCur->number = pState->number;
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
char buf[128] = {0};
int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_prev(pCur->iter);
}
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
pCur = NULL;
}
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey);
return pCur;
}
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter);
return 0;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
@ -2301,6 +2346,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
}
return pCur;
}
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
qDebug("streamStateSessionGetKVByCur_rocksdb");
if (!pCur) {

View File

@ -670,7 +670,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB
return streamStateCurPrev_rocksdb(pState, pCur);
return streamStateCurPrev_rocksdb(pCur);
#else
if (!pCur) {
return -1;

View File

@ -58,7 +58,7 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo;
int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) {
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
if (pos) {
(*pos)->beFlushed = true;
@ -77,13 +77,9 @@ int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pP
return TSDB_CODE_SUCCESS;
}
void stateHashBuffClearFn(void* pBuff) {
tSimpleHashClear(pBuff);
}
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
void stateHashBuffCleanupFn(void* pBuff) {
tSimpleHashCleanup(pBuff);
}
void stateHashBuffCleanupFn(void* pBuff) { tSimpleHashCleanup(pBuff); }
int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
@ -118,8 +114,8 @@ void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId, int8_t type) {
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
int8_t type) {
if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
}
@ -178,7 +174,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->maxTs = INT64_MIN;
pFileState->id = taosStrdup(taskId);
//todo(liuyao) optimize
// todo(liuyao) optimize
if (type == STREAM_STATE_BUFF_HASH) {
recoverSnapshot(pFileState, checkpointId);
}
@ -290,9 +286,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) {
pPos->beUsed = used;
}
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0;
@ -647,6 +641,26 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
return code;
}
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
int code = TSDB_CODE_SUCCESS;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore);
if (pCur == NULL) {
return -1;
}
while (code == TSDB_CODE_SUCCESS) {
void* pVal = NULL;
int32_t vlen = 0;
SSessionKey key = {0};
code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen);
if (code != 0) {
break;
}
taosMemoryFree(pVal);
code = streamStateSessionCurPrev_rocksdb(pCur);
}
streamStateFreeCur(pCur);
return code;
}
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS;
if (pFileState->maxTs != INT64_MIN) {
@ -656,8 +670,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
deleteExpiredCheckPoint(pFileState, mark);
}
SWinKey key = {.groupId = 0, .ts = 0};
SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key);
SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore);
if (pCur == NULL) {
return -1;
}
@ -667,9 +680,9 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
break;
}
void* pVal = NULL;
int32_t pVLen = 0;
int32_t vlen = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
destroyRowBuffPos(pNewPos);
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
@ -677,8 +690,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
taosMemoryFreeClear(pVal);
break;
}
ASSERT(pVLen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, pVLen);
ASSERT(vlen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, vlen);
taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true;
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
@ -686,7 +699,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
destroyRowBuffPos(pNewPos);
break;
}
code = streamStateCurPrev_rocksdb(pFileState->pFileStore, pCur);
code = streamStateCurPrev_rocksdb(pCur);
}
streamStateFreeCur(pCur);
@ -700,22 +713,14 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
pFileState->maxTs = TMAX(pFileState->maxTs, ts);
}
void* getRowStateBuff(SStreamFileState* pFileState) {
return pFileState->rowStateBuff;
}
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
void* getStateFileStore(SStreamFileState* pFileState) {
return pFileState->pFileStore;
}
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) {
return ts <= (pFileState->flushMark + gap);
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
int32_t getRowStateRowSize(SStreamFileState* pFileState) {
return pFileState->rowSize;
}
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }

View File

@ -26,6 +26,10 @@ void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; }
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
int32_t code = (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg);
if (code != 0) {
SRpcMsg rsp = {.code = code, .info = pMsg->info};
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}

View File

@ -56,6 +56,8 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
}
void walCloseReader(SWalReader *pReader) {
if(pReader == NULL) return;
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
taosMemoryFreeClear(pReader->pHead);

View File

@ -48,6 +48,7 @@ void createUsers(TAOS *taos, const char *host, char *qstr);
void passVerTestMulti(const char *host, char *qstr);
void sysInfoTest(TAOS *taos, const char *host, char *qstr);
void userDroppedTest(TAOS *taos, const char *host, char *qstr);
void clearTestEnv(TAOS *taos, const char *host, char *qstr);
int nPassVerNotified = 0;
int nUserDropped = 0;
@ -210,6 +211,7 @@ int main(int argc, char *argv[]) {
passVerTestMulti(argv[1], qstr);
sysInfoTest(taos, argv[1], qstr);
userDroppedTest(taos, argv[1], qstr);
clearTestEnv(taos, argv[1], qstr);
taos_close(taos);
taos_cleanup();
@ -267,9 +269,9 @@ void passVerTestMulti(const char *host, char *qstr) {
queryDB(taos[0], "create database if not exists demo2 vgroups 1 minrows 10");
queryDB(taos[0], "create database if not exists demo3 vgroups 1 minrows 10");
queryDB(taos[0], "create table demo1.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taos[0], "create table demo2.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taos[0], "create table demo3.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taos[0], "create table if not exists demo1.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taos[0], "create table if not exists demo2.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taos[0], "create table if not exists demo3.stb (ts timestamp, c1 int) tags(t1 int)");
strcpy(qstr, "alter user root pass 'taos'");
queryDB(taos[0], qstr);
@ -326,9 +328,9 @@ void sysInfoTest(TAOS *taosRoot, const char *host, char *qstr) {
queryDB(taosRoot, "create database if not exists demo12 vgroups 1 minrows 10");
queryDB(taosRoot, "create database if not exists demo13 vgroups 1 minrows 10");
queryDB(taosRoot, "create table demo11.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taosRoot, "create table demo12.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taosRoot, "create table demo13.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taosRoot, "create table if not exists demo11.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taosRoot, "create table if not exists demo12.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taosRoot, "create table if not exists demo13.stb (ts timestamp, c1 int) tags(t1 int)");
sprintf(qstr, "show grants");
char output[BUF_LEN];
@ -387,10 +389,14 @@ _REP:
fprintf(stderr, ">>> succeed to run sysInfoTest\n");
fprintf(stderr, "######## %s #########\n", __func__);
}
static bool isDropUser = true;
void userDroppedTest(TAOS *taos, const char *host, char *qstr) {
// users
int nTestUsers = nUser;
int nLoop = 0;
_loop:
++nLoop;
printf("\n\n%s:%d LOOP %d, nTestUsers:%d\n", __func__, __LINE__, nLoop, nTestUsers);
for (int i = 0; i < nTestUsers; ++i) {
// sprintf(users[i], "user%d", i);
taosu[i] = taos_connect(host, users[i], "taos", NULL, 0);
@ -426,7 +432,6 @@ void userDroppedTest(TAOS *taos, const char *host, char *qstr) {
for (int i = 0; i < nTestUsers; ++i) {
taos_close(taosu[i]);
printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i);
sleep(1);
}
fprintf(stderr, "######## %s #########\n", __func__);
@ -437,5 +442,32 @@ void userDroppedTest(TAOS *taos, const char *host, char *qstr) {
exit(1);
}
fprintf(stderr, "######## %s #########\n", __func__);
// sleep(300);
if (nLoop < 5) {
nUserDropped = 0;
for (int i = 0; i < nTestUsers; ++i) {
sprintf(users[i], "user%d", i);
sprintf(qstr, "CREATE USER %s PASS 'taos'", users[i]);
fprintf(stderr, "%s:%d create user:%s\n", __func__, __LINE__, users[i]);
queryDB(taos, qstr);
}
goto _loop;
}
isDropUser = false;
}
void clearTestEnv(TAOS *taos, const char *host, char *qstr) {
fprintf(stderr, "######## %s start #########\n", __func__);
// restore password
sprintf(qstr, "alter user root pass 'taosdata'");
queryDB(taos, qstr);
if (isDropUser) {
for (int i = 0; i < nUser; ++i) {
sprintf(qstr, "drop user %s", users[i]);
queryDB(taos, qstr);
}
}
// sleep(3000);
fprintf(stderr, "######## %s end #########\n", __func__);
}

View File

@ -141,12 +141,12 @@ class TDTestCase:
def test_sort_for_partition_hint(self):
sql = 'select count(*), c1 from meters partition by c1'
sql_hint = 'select /*+ sort_for_group() */count(*), c1 from meters partition by c1'
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
#self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
sql = 'select count(*), c1, tbname from meters partition by tbname, c1'
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1'
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
#self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
sql = 'select count(*), c1, tbname from meters partition by tbname, c1 interval(1s)'
@ -156,7 +156,7 @@ class TDTestCase:
sql = 'select count(*), c1, t1 from meters partition by t1, c1'
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1'
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
#self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
sql = 'select count(*), c1, t1 from meters partition by t1, c1 interval(1s)'
@ -208,7 +208,7 @@ class TDTestCase:
sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list)
sql = self.add_order_by(sql, order_by, select_list)
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
self.check_explain_res_has_row("Partition", self.explain_sql(sql))
#self.check_explain_res_has_row("Partition", self.explain_sql(sql))
self.query_and_compare_res(sql, sql_hint, compare_what=compare_what)
def test_sort_for_partition_res(self):

View File

@ -137,6 +137,10 @@ class TDTestCase:
if not plan_found:
tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(rows)))
def check_explain_res_no_row(self, plan_str_not_expect: str, res):
for row in res:
if str(row).find(plan_str_not_expect) >= 0:
tdLog.exit('plan: [%s] found in: [%s]' % (plan_str_not_expect, str(row)))
def test_sort_for_partition_hint(self):
pass
@ -147,6 +151,9 @@ class TDTestCase:
def add_hint(self, sql: str) -> str:
return "select /*+ sort_for_group() */ %s" % sql[6:]
def add_remove_partition_hint(self, sql: str) -> str:
return "select /*+ remove_partition() */ %s" % sql[6:]
def query_with_time(self, sql):
start = datetime.now()
tdSql.query(sql, queryTimes=1)
@ -200,8 +207,8 @@ class TDTestCase:
def check_explain(self, sql):
sql_hint = self.add_hint(sql)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row('SortMerge', explain_res)
self.check_explain_res_has_row("blocking=0", explain_res)
#self.check_explain_res_has_row('SortMerge', explain_res)
#self.check_explain_res_has_row("blocking=0", explain_res)
explain_res = self.explain_sql(sql_hint)
self.check_explain_res_has_row('SortMerge', explain_res)
self.check_explain_res_has_row('blocking=0', explain_res)
@ -233,14 +240,27 @@ class TDTestCase:
sql_hint = self.add_hint(sql)
sql = self.add_order_by(sql, ele[3])
sql_no_slimit = sql_template % (ele[0], ele[1], '')
sql_no_slimit = self.add_order_by(sql_no_slimit, ele[3])
self.query_and_compare_first_rows(sql_hint, sql_no_slimit)
def test_remove_partition(self):
sql = 'select c1, count(*) from meters partition by c1 slimit 10'
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Partition", explain_res)
self.check_explain_res_has_row("blocking=1", explain_res)
sql = 'select c1, count(*) from meters partition by c1,c2 slimit 10'
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Partition", explain_res)
self.check_explain_res_has_row("blocking=1", explain_res)
def run(self):
self.prepareTestEnv()
#time.sleep(99999999)
self.test_pipelined_agg_plan_with_slimit()
self.test_pipelined_agg_data_with_slimit()
self.test_remove_partition()
def stop(self):
tdSql.close()

View File

@ -107,8 +107,8 @@ class RestoreBasic:
status = True
break
else:
time.sleep(0.5)
tdLog.info(f"sleep 500ms retry {i} to check status again...")
time.sleep(1)
tdLog.info(f"sleep 1s retry {i} to check status again...")
if status == False:
tdLog.exit("check vgroups status failed, exit.")