diff --git a/README-CN.md b/README-CN.md
index 4931c0177e..06ac087859 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -12,7 +12,7 @@
[](https://travis-ci.org/taosdata/TDengine)
[](https://ci.appveyor.com/project/sangshuduo/tdengine-2n8ge/branch/master)
-[](https://coveralls.io/github/taosdata/TDengine?branch=develop)
+[](https://coveralls.io/github/taosdata/TDengine?branch=3.0)
[](https://bestpractices.coreinfrastructure.org/projects/4201)
简体中文 | [English](README.md) | [TDengine 云服务](https://cloud.taosdata.com/?utm_medium=cn&utm_source=github) | 很多职位正在热招中,请看[这里](https://www.taosdata.com/cn/careers/)
diff --git a/README.md b/README.md
index 31d3a8bf67..e390b5e764 100644
--- a/README.md
+++ b/README.md
@@ -12,7 +12,7 @@
[](https://cloud.drone.io/taosdata/TDengine)
[](https://ci.appveyor.com/project/sangshuduo/tdengine-2n8ge/branch/master)
-[](https://coveralls.io/github/taosdata/TDengine?branch=develop)
+[](https://coveralls.io/github/taosdata/TDengine?branch=3.0)
[](https://bestpractices.coreinfrastructure.org/projects/4201)
[](https://twitter.com/tdenginedb)
diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md
index fbdae3445b..b4f1cf65da 100644
--- a/docs/en/12-taos-sql/10-function.md
+++ b/docs/en/12-taos-sql/10-function.md
@@ -491,6 +491,8 @@ TO_CHAR(ts, format_str_literal)
**Description**: Convert a ts column to string as the format specified
+**Version**: Since ver-3.2.2.0
+
**Return value type**: VARCHAR
**Applicable column types**: TIMESTAMP
@@ -550,6 +552,8 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal)
**Description**: Convert a formated timestamp string to a timestamp
+**Version**: Since ver-3.2.2.0
+
**Return value type**: TIMESTAMP
**Applicable column types**: VARCHAR
diff --git a/docs/en/12-taos-sql/18-escape.md b/docs/en/12-taos-sql/18-escape.md
index a44b21db43..2d067b2ad9 100644
--- a/docs/en/12-taos-sql/18-escape.md
+++ b/docs/en/12-taos-sql/18-escape.md
@@ -7,14 +7,14 @@ description: This document describes the usage of escape characters in TDengine.
| Escape Character | **Actual Meaning** |
| :--------------: | ------------------------ |
-| `\'` | Single quote ' |
-| `\"` | Double quote " |
-| \n | Line Break |
-| \r | Carriage Return |
-| \t | tab |
-| `\\` | Back Slash \ |
-| `\%` | % see below for details |
-| `\_` | \_ see below for details |
+| `\'` | Single quote `'` |
+| `\"` | Double quote `"` |
+| `\n` | Line Break |
+| `\r` | Carriage Return |
+| `\t` | tab |
+| `\\` | Back Slash `\ ` |
+| `\%` | `%` see below for details |
+| `\_` | `_` see below for details |
## Restrictions
@@ -22,5 +22,5 @@ description: This document describes the usage of escape characters in TDengine.
- Identifier without ``: Error will be returned because identifier must be constituted of digits, ASCII characters or underscore and can't be started with digits
- Identifier quoted with ``: Original content is kept, no escaping
2. If there are escape characters in values
- - The escape characters will be escaped as the above table. If the escape character doesn't match any supported one, the escape character "\" will be ignored.
- - "%" and "\_" are used as wildcards in `like`. `\%` and `\_` should be used to represent literal "%" and "\_" in `like`,. If `\%` and `\_` are used out of `like` context, the evaluation result is "`\%`"and "`\_`", instead of "%" and "\_".
+ - The escape characters will be escaped as the above table. If the escape character doesn't match any supported one, the escape character `\ ` will be ignored(`\x` remaining).
+ - `%` and `_` are used as wildcards in `like`. `\%` and `\_` should be used to represent literal `%` and `_` in `like`. If `\%` and `\_` are used out of `like` context, the evaluation result is `\%` and `\_`, instead of `%` and `_`.
diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md
index 66322d55f1..0482022d95 100644
--- a/docs/zh/12-taos-sql/10-function.md
+++ b/docs/zh/12-taos-sql/10-function.md
@@ -491,6 +491,8 @@ TO_CHAR(ts, format_str_literal)
**功能说明**: 将timestamp类型按照指定格式转换为字符串
+**版本**: ver-3.2.2.0
+
**返回结果数据类型**: VARCHAR
**应用字段**: TIMESTAMP
@@ -550,6 +552,8 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal)
**功能说明**: 将字符串按照指定格式转化为时间戳.
+**版本**: ver-3.2.2.0
+
**返回结果数据类型**: TIMESTAMP
**应用字段**: VARCHAR
diff --git a/docs/zh/12-taos-sql/18-escape.md b/docs/zh/12-taos-sql/18-escape.md
index 5e0d292d39..81e4179042 100644
--- a/docs/zh/12-taos-sql/18-escape.md
+++ b/docs/zh/12-taos-sql/18-escape.md
@@ -8,16 +8,15 @@ description: TDengine 中使用转义字符的详细规则
| 字符序列 | **代表的字符** |
| :------: | -------------- |
-| `\'` | 单引号' |
-| `\"` | 双引号" |
-| \n | 换行符 |
-| \r | 回车符 |
-| \t | tab 符 |
-| `\\` | 斜杠\ |
-| `\%` | % 规则见下 |
-| `\_` | \_ 规则见下 |
+| `\'` | 单引号`'` |
+| `\"` | 双引号`"` |
+| `\n` | 换行符 |
+| `\r` | 回车符 |
+| `\t` | tab 符 |
+| `\\` | 斜杠 `\ ` |
+| `\%` | `%` 规则见下 |
+| `\_` | `_` 规则见下 |
-:::
## 转义字符使用规则
@@ -25,5 +24,5 @@ description: TDengine 中使用转义字符的详细规则
1. 普通标识符: 直接提示错误的标识符,因为标识符规定必须是数字、字母和下划线,并且不能以数字开头。
2. 反引号``标识符: 保持原样,不转义
2. 数据里有转义字符
- 1. 遇到上面定义的转义字符会转义(%和\_见下面说明),如果没有匹配的转义字符会忽略掉转义符\。
- 2. 对于%和\_,因为在 like 里这两个字符是通配符,所以在模式匹配 like 里用`\%`%和`\_`表示字符里本身的%和\_,如果在 like 模式匹配上下文之外使用`\%`或`\_`,则它们的计算结果为字符串`\%`和`\_`,而不是%和\_。
+ 1. 遇到上面定义的转义字符会转义(`%`和`_`见下面说明),如果没有匹配的转义字符会忽略掉转义符`\ `(`\x`保持原样)。
+ 2. 对于`%`和`_`,因为在`like`里这两个字符是通配符,所以在模式匹配`like`里用`\%`和`\_`表示字符里本身的`%`和`_`,如果在`like`模式匹配上下文之外使用`\%`或`\_`,则它们的计算结果为字符串`\%`和`\_`,而不是`%`和`_`。
diff --git a/include/common/tcommon.h b/include/common/tcommon.h
index 24e5d186b9..d4537ddc89 100644
--- a/include/common/tcommon.h
+++ b/include/common/tcommon.h
@@ -206,6 +206,7 @@ typedef struct SDataBlockInfo {
int16_t hasVarCol;
int16_t dataLoad; // denote if the data is loaded or not
uint8_t scanFlag;
+ bool blankFill;
// TODO: optimize and remove following
int64_t version; // used for stream, and need serialization
diff --git a/include/common/tgrant.h b/include/common/tgrant.h
index f06fca8014..cfb1401698 100644
--- a/include/common/tgrant.h
+++ b/include/common/tgrant.h
@@ -48,6 +48,7 @@ typedef enum {
TSDB_GRANT_CPU_CORES,
TSDB_GRANT_STABLE,
TSDB_GRANT_TABLE,
+ TSDB_GRANT_SUBSCRIBE,
} EGrantType;
int32_t grantCheck(EGrantType grant);
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index 2ee48a18e0..69677e6bc1 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -3331,7 +3331,7 @@ typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
-} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq;
+} SVPauseStreamTaskReq, SVResetStreamTaskReq;
typedef struct {
int8_t reserved;
@@ -3754,7 +3754,12 @@ typedef struct {
} SMqHbReq;
typedef struct {
- int8_t reserved;
+ char topic[TSDB_TOPIC_FNAME_LEN];
+ int8_t noPrivilege;
+} STopicPrivilege;
+
+typedef struct {
+ SArray* topicPrivileges; // SArray
} SMqHbRsp;
typedef struct {
@@ -3773,18 +3778,6 @@ typedef struct {
SVCreateTbReq cTbReq;
} SVSubmitBlk;
-typedef struct {
- int32_t flags;
- int32_t nBlocks;
- union {
- SArray* pArray;
- SVSubmitBlk* pBlocks;
- };
-} SVSubmitReq;
-
-int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
-int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
-
typedef struct {
SMsgHead header;
uint64_t sId;
@@ -3893,6 +3886,10 @@ int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeatroySMqHbReq(SMqHbReq* pReq);
+int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
+int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
+int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp);
+
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
diff --git a/include/libs/function/function.h b/include/libs/function/function.h
index 8863201094..0fa84c99c6 100644
--- a/include/libs/function/function.h
+++ b/include/libs/function/function.h
@@ -114,6 +114,7 @@ typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data
int32_t startRowIndex; // handle started row index
int64_t numOfRows; // the number of rows needs to be handled
+ bool blankFill; // fill blank data to block for empty table
int32_t numOfInputCols; // PTS is not included
bool colDataSMAIsSet; // if agg is set or not
SColumnInfoData *pPTS; // primary timestamp column
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 9b3ce36bdd..747ba34c97 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -324,12 +324,13 @@ typedef struct SStreamStatus {
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
- int32_t schedIdleTime; // idle time before invoke again
- int64_t lastExecTs; // last exec time stamp
int8_t statusBackup;
- bool appendTranstateBlock; // has append the transfer state data block already
- int32_t timerActive; // timer is active
+ int32_t schedIdleTime; // idle time before invoke again
+ int32_t timerActive; // timer is active
+ int64_t lastExecTs; // last exec time stamp
int32_t inScanHistorySentinel;
+ bool appendTranstateBlock; // has append the transfer state data block already
+ bool supplementaryWalscan; // complete the supplementary wal scan or not
} SStreamStatus;
typedef struct SDataRange {
diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c
index 8ac9550aca..36a3e50aef 100644
--- a/source/client/src/clientStmt.c
+++ b/source/client/src/clientStmt.c
@@ -406,10 +406,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
if (pStmt->bInfo.inExecCache) {
- if (ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1)) {
- tscError("stmtGetFromCache error");
- return TSDB_CODE_TSC_STMT_CACHE_ERROR;
- }
pStmt->bInfo.needParse = false;
tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;
diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c
index 69681b9ae0..8b424a7bf7 100644
--- a/source/client/src/clientTmq.c
+++ b/source/client/src/clientTmq.c
@@ -155,6 +155,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray
SSchemaWrapper schema;
+ int8_t noPrivilege;
} SMqClientTopic;
typedef struct {
@@ -739,6 +740,29 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
+ SMqHbRsp rsp = {0};
+ tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
+
+ int64_t refId = *(int64_t*)param;
+ tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
+ if (tmq != NULL) {
+ taosWLockLatch(&tmq->lock);
+ for(int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++){
+ STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
+ if(privilege->noPrivilege == 1){
+ int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
+ for (int32_t j = 0; j < topicNumCur; j++) {
+ SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
+ if(strcmp(pTopicCur->topicName, privilege->topic) == 0){
+ tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
+ pTopicCur->noPrivilege = 1;
+ }
+ }
+ }
+ }
+ taosWUnLockLatch(&tmq->lock);
+ }
+ tDeatroySMqHbRsp(&rsp);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
@@ -809,7 +833,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
- sendInfo->param = NULL;
+ sendInfo->paramFreeFp = taosMemoryFree;
+ sendInfo->param = taosMemoryMalloc(sizeof(int64_t));
+ *(int64_t *)sendInfo->param = refId;
sendInfo->fp = tmqHbCb;
sendInfo->msgType = TDMT_MND_TMQ_HB;
@@ -1705,7 +1731,10 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
-
+ if(pTopic->noPrivilege){
+ tscDebug("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
+ continue;
+ }
for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index a7d80fe5db..53169019b6 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4;
-float tsRatioOfVnodeStreamThreads = 1.0;
+float tsRatioOfVnodeStreamThreads = 0.5F;
int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4;
@@ -622,7 +622,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
0)
return -1;
- if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 10, CFG_SCOPE_SERVER,
+ if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER,
CFG_DYN_NONE) != 0)
return -1;
diff --git a/source/common/src/tgrant.c b/source/common/src/tgrant.c
index 74a59fd580..f212d71362 100644
--- a/source/common/src/tgrant.c
+++ b/source/common/src/tgrant.c
@@ -18,6 +18,6 @@
#ifndef _GRANT
-int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
+int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;}
#endif
\ No newline at end of file
diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c
index c9e2908e8a..3f1cfbc87f 100644
--- a/source/common/src/tmsg.c
+++ b/source/common/src/tmsg.c
@@ -6139,6 +6139,55 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
return 0;
}
+int32_t tDeatroySMqHbRsp(SMqHbRsp *pRsp) {
+ taosArrayDestroy(pRsp->topicPrivileges);
+ return 0;
+}
+
+int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
+ SEncoder encoder = {0};
+ tEncoderInit(&encoder, buf, bufLen);
+ if (tStartEncode(&encoder) < 0) return -1;
+
+ int32_t sz = taosArrayGetSize(pRsp->topicPrivileges);
+ if (tEncodeI32(&encoder, sz) < 0) return -1;
+ for (int32_t i = 0; i < sz; ++i) {
+ STopicPrivilege *privilege = (STopicPrivilege *)taosArrayGet(pRsp->topicPrivileges, i);
+ if (tEncodeCStr(&encoder, privilege->topic) < 0) return -1;
+ if (tEncodeI8(&encoder, privilege->noPrivilege) < 0) return -1;
+ }
+
+ tEndEncode(&encoder);
+
+ int32_t tlen = encoder.pos;
+ tEncoderClear(&encoder);
+
+ return tlen;
+}
+
+int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
+ SDecoder decoder = {0};
+ tDecoderInit(&decoder, (char *)buf, bufLen);
+
+ if (tStartDecode(&decoder) < 0) return -1;
+
+ int32_t sz = 0;
+ if (tDecodeI32(&decoder, &sz) < 0) return -1;
+ if (sz > 0) {
+ pRsp->topicPrivileges = taosArrayInit(sz, sizeof(STopicPrivilege));
+ if (NULL == pRsp->topicPrivileges) return -1;
+ for (int32_t i = 0; i < sz; ++i) {
+ STopicPrivilege *data = taosArrayReserve(pRsp->topicPrivileges, 1);
+ if (tDecodeCStrTo(&decoder, data->topic) < 0) return -1;
+ if (tDecodeI8(&decoder, &data->noPrivilege) < 0) return -1;
+ }
+ }
+ tEndDecode(&decoder);
+
+ tDecoderClear(&decoder);
+ return 0;
+}
+
int32_t tDeatroySMqHbReq(SMqHbReq *pReq) {
for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) {
TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i);
@@ -6194,7 +6243,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if (NULL == pReq->topics) return -1;
for (int32_t i = 0; i < sz; ++i) {
TopicOffsetRows *data = taosArrayReserve(pReq->topics, 1);
- tDecodeCStrTo(&decoder, data->topicName);
+ if (tDecodeCStrTo(&decoder, data->topicName) < 0) return -1;
int32_t szVgs = 0;
if (tDecodeI32(&decoder, &szVgs) < 0) return -1;
if (szVgs > 0) {
@@ -7753,36 +7802,6 @@ static int32_t tDecodeSVSubmitBlk(SDecoder *pCoder, SVSubmitBlk *pBlock, int32_t
return 0;
}
-int32_t tEncodeSVSubmitReq(SEncoder *pCoder, const SVSubmitReq *pReq) {
- int32_t nBlocks = taosArrayGetSize(pReq->pArray);
-
- if (tStartEncode(pCoder) < 0) return -1;
-
- if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1;
- if (tEncodeI32v(pCoder, nBlocks) < 0) return -1;
- for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
- if (tEncodeSVSubmitBlk(pCoder, (SVSubmitBlk *)taosArrayGet(pReq->pArray, iBlock), pReq->flags) < 0) return -1;
- }
-
- tEndEncode(pCoder);
- return 0;
-}
-
-int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
- if (tStartDecode(pCoder) < 0) return -1;
-
- if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
- if (tDecodeI32v(pCoder, &pReq->nBlocks) < 0) return -1;
- pReq->pBlocks = tDecoderMalloc(pCoder, sizeof(SVSubmitBlk) * pReq->nBlocks);
- if (pReq->pBlocks == NULL) return -1;
- for (int32_t iBlock = 0; iBlock < pReq->nBlocks; iBlock++) {
- if (tDecodeSVSubmitBlk(pCoder, pReq->pBlocks + iBlock, pReq->flags) < 0) return -1;
- }
-
- tEndDecode(pCoder);
- return 0;
-}
-
static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) {
if (tStartEncode(pEncoder) < 0) return -1;
diff --git a/source/dnode/mnode/impl/inc/mndPrivilege.h b/source/dnode/mnode/impl/inc/mndPrivilege.h
index 4a8fb20715..6f74ea3b36 100644
--- a/source/dnode/mnode/impl/inc/mndPrivilege.h
+++ b/source/dnode/mnode/impl/inc/mndPrivilege.h
@@ -30,7 +30,6 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname);
int32_t mndCheckViewPrivilege(SMnode *pMnode, const char *user, EOperType operType, const char *pViewFName);
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic);
-int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName);
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);
diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h
index 92035101f6..372612274f 100644
--- a/source/dnode/mnode/impl/inc/mndStream.h
+++ b/source/dnode/mnode/impl/inc/mndStream.h
@@ -26,9 +26,17 @@ extern "C" {
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_VER_NUMBER 4
+#define MND_STREAM_CREATE_NAME "stream-create"
+#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
+#define MND_STREAM_PAUSE_NAME "stream-pause"
+#define MND_STREAM_RESUME_NAME "stream-resume"
+#define MND_STREAM_DROP_NAME "stream-drop"
+#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
+#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
+
typedef struct SStreamTransInfo {
int64_t startTime;
- int64_t streamUid;
+ int64_t streamId;
const char *name;
int32_t transId;
} SStreamTransInfo;
@@ -41,7 +49,7 @@ typedef struct SVgroupChangeInfo {
// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard
// to avoid too many checkpoints for a taskk in the waiting list
typedef struct SCheckpointCandEntry {
- char * pName;
+ char *pName;
int64_t streamId;
int64_t checkpointTs;
int64_t checkpointId;
@@ -62,6 +70,9 @@ typedef struct SStreamExecInfo {
SHashObj *pTransferStateStreams;
} SStreamExecInfo;
+extern SStreamExecInfo execInfo;
+typedef struct SStreamTaskIter SStreamTaskIter;
+
typedef struct SNodeEntry {
int32_t nodeId;
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
@@ -69,21 +80,11 @@ typedef struct SNodeEntry {
int64_t hbTimestamp; // second
} SNodeEntry;
-typedef struct SFailedCheckpointInfo {
- int64_t streamUid;
- int64_t checkpointId;
- int32_t transId;
-} SFailedCheckpointInfo;
-
-#define MND_STREAM_CREATE_NAME "stream-create"
-#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
-#define MND_STREAM_PAUSE_NAME "stream-pause"
-#define MND_STREAM_RESUME_NAME "stream-resume"
-#define MND_STREAM_DROP_NAME "stream-drop"
-#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
-#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
-
-extern SStreamExecInfo execInfo;
+typedef struct SOrphanTask {
+ int64_t streamId;
+ int32_t taskId;
+ int32_t nodeId;
+} SOrphanTask;
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
@@ -91,35 +92,38 @@ SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
+int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
+int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
+bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
+int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
-int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid);
-int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
-bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
-int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
-
-// for sma
-// TODO refactor
-int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
-int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
-int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
-SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
-void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
-int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
- int32_t retryCode);
-STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
-int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
-SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
-void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
-int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
+int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
+int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
+SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
+void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
+int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
+ int32_t retryCode);
+STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
+int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
+SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
+void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
+int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t initStreamNodeList(SMnode *pMnode);
-int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
+int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
+int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
+int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
+int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
+SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
+void destroyStreamTaskIter(SStreamTaskIter *pIter);
+bool streamTaskIterNextTask(SStreamTaskIter *pIter);
+SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
#ifdef __cplusplus
}
diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c
index 101022a44f..4e71684372 100644
--- a/source/dnode/mnode/impl/src/mndCompact.c
+++ b/source/dnode/mnode/impl/src/mndCompact.c
@@ -599,7 +599,8 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
pDetail->newNumberFileset, pDetail->newFinished);
- if(pDetail->numberFileset < pDetail->newNumberFileset || pDetail->finished < pDetail->newFinished)
+ //these 2 number will jump back after dnode restart, so < is not used here
+ if(pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
needSave = true;
}
diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c
index 4db000287c..a730c6ed84 100644
--- a/source/dnode/mnode/impl/src/mndConsumer.c
+++ b/source/dnode/mnode/impl/src/mndConsumer.c
@@ -101,8 +101,9 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
goto FAILED;
}
- if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
- code = -1;
+ if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) {
+ code = TSDB_CODE_MND_NO_RIGHTS;
+ terrno = TSDB_CODE_MND_NO_RIGHTS;
goto FAILED;
}
@@ -220,22 +221,53 @@ FAIL:
return -1;
}
+static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char* user){
+ rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
+ if(rsp->topicPrivileges == NULL){
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return terrno;
+ }
+ for(int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++){
+ char *topic = taosArrayGetP(pConsumer->currentTopics, i);
+ SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic);
+ if (pTopic == NULL) { // terrno has been set by callee function
+ continue;
+ }
+ STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
+ strcpy(data->topic, topic);
+ if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) {
+ data->noPrivilege = 1;
+ } else{
+ data->noPrivilege = 0;
+ }
+ mndReleaseTopic(pMnode, pTopic);
+ }
+ return 0;
+}
+
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t code = 0;
SMnode *pMnode = pMsg->info.node;
SMqHbReq req = {0};
+ SMqHbRsp rsp = {0};
+ SMqConsumerObj *pConsumer = NULL;
- if ((code = tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req)) < 0) {
+ if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
+ code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
int64_t consumerId = req.consumerId;
- SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
+ pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
mError("consumer:0x%" PRIx64 " not exist", consumerId);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
- code = -1;
+ code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
+ goto end;
+ }
+ code = checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user);
+ if(code != 0){
goto end;
}
@@ -280,9 +312,22 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
mndReleaseSubscribe(pMnode, pSub);
}
- mndReleaseConsumer(pMnode, pConsumer);
+ // encode rsp
+ int32_t tlen = tSerializeSMqHbRsp(NULL, 0, &rsp);
+ void *buf = rpcMallocCont(tlen);
+ if (buf == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto end;
+ }
+
+ tSerializeSMqHbRsp(buf, tlen, &rsp);
+ pMsg->info.rsp = buf;
+ pMsg->info.rspLen = tlen;
end:
+ tDeatroySMqHbRsp(&rsp);
+ mndReleaseConsumer(pMnode, pConsumer);
tDeatroySMqHbReq(&req);
return code;
}
@@ -500,6 +545,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
char *msgStr = pMsg->pCont;
+ if(grantCheck(TSDB_GRANT_SUBSCRIBE) < 0){
+ terrno = TSDB_CODE_GRANT_EXPIRED;
+ return -1;
+ }
+
SCMSubscribeReq subscribe = {0};
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
@@ -525,7 +575,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
// check topic existence
- pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
+ pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe");
if (pTrans == NULL) {
goto _over;
}
diff --git a/source/dnode/mnode/impl/src/mndPrivilege.c b/source/dnode/mnode/impl/src/mndPrivilege.c
index d4c0a6b36b..13a80cb1a6 100644
--- a/source/dnode/mnode/impl/src/mndPrivilege.c
+++ b/source/dnode/mnode/impl/src/mndPrivilege.c
@@ -30,9 +30,6 @@ int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType op
}
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
-int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) {
- return 0;
-}
int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) {
diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c
index a89136e7d3..e6027a0332 100644
--- a/source/dnode/mnode/impl/src/mndSma.c
+++ b/source/dnode/mnode/impl/src/mndSma.c
@@ -865,7 +865,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
} else {
- if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
+ if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
@@ -917,7 +917,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
if (pStream != NULL && pStream->smaId == pSma->uid) {
- if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
+ if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
mndReleaseStream(pMnode, pStream);
goto _OVER;
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 7b348172f2..0de951596f 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -476,22 +476,20 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
}
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
- int32_t level = taosArrayGetSize(pStream->tasks);
- for (int32_t i = 0; i < level; i++) {
- SArray *pLevel = taosArrayGetP(pStream->tasks, i);
-
- int32_t numOfTasks = taosArrayGetSize(pLevel);
- for (int32_t j = 0; j < numOfTasks; j++) {
- SStreamTask *pTask = taosArrayGetP(pLevel, j);
- if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
- return -1;
- }
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
+ while (streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
+ if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
+ destroyStreamTaskIter(pIter);
+ return -1;
}
}
+ destroyStreamTaskIter(pIter);
+
// persistent stream task for already stored ts data
if (pStream->conf.fillHistory) {
- level = taosArrayGetSize(pStream->pHTasksList);
+ int32_t level = taosArrayGetSize(pStream->pHTasksList);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
@@ -608,50 +606,6 @@ _OVER:
return -1;
}
-static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
- SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
- if (pReq == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- return -1;
- }
-
- pReq->head.vgId = htonl(pTask->info.nodeId);
- pReq->taskId = pTask->id.taskId;
- pReq->streamId = pTask->id.streamId;
-
- SEpSet epset = {0};
- bool hasEpset = false;
- int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
- if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
- terrno = code;
- return -1;
- }
-
- // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
- code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
- if (code != 0) {
- taosMemoryFree(pReq);
- return -1;
- }
-
- return 0;
-}
-
-int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
- int32_t lv = taosArrayGetSize(pStream->tasks);
- for (int32_t i = 0; i < lv; i++) {
- SArray *pTasks = taosArrayGetP(pStream->tasks, i);
- int32_t sz = taosArrayGetSize(pTasks);
- for (int32_t j = 0; j < sz; j++) {
- SStreamTask *pTask = taosArrayGetP(pTasks, j);
- if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) {
- return -1;
- }
- }
- }
- return 0;
-}
-
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
@@ -690,6 +644,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
int32_t sqlLen = 0;
terrno = TSDB_CODE_SUCCESS;
+ if(grantCheck(TSDB_GRANT_STREAMS) < 0){
+ terrno = TSDB_CODE_GRANT_STREAM_LIMITED;
+ return -1;
+ }
+
SCMCreateStreamReq createStreamReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
@@ -900,6 +859,32 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
return 0;
}
+static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
+ int8_t mndTrigger) {
+ void *buf;
+ int32_t tlen;
+ if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
+ pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
+ taosMemoryFree(buf);
+ return -1;
+ }
+
+ SEpSet epset = {0};
+ bool hasEpset = false;
+ int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
+ if (code != TSDB_CODE_SUCCESS || !hasEpset) {
+ taosMemoryFree(buf);
+ return -1;
+ }
+
+ code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
+ if (code != 0) {
+ taosMemoryFree(buf);
+ }
+
+ return code;
+}
+
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
int8_t mndTrigger, bool lock) {
int32_t code = -1;
@@ -909,6 +894,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
+
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId);
@@ -931,8 +917,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
pStream->currentTick = 1;
// 1. redo action: broadcast checkpoint source msg for all source vg
- int32_t totLevel = taosArrayGetSize(pStream->tasks);
- for (int32_t i = 0; i < totLevel; i++) {
+ int32_t totalLevel = taosArrayGetSize(pStream->tasks);
+ for (int32_t i = 0; i < totalLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *p = taosArrayGetP(pLevel, 0);
@@ -940,28 +926,9 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
+ code = doSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
- void *buf;
- int32_t tlen;
- if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
- pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
- taosWUnLockLatch(&pStream->lock);
- goto _ERR;
- }
-
- SEpSet epset = {0};
- bool hasEpset = false;
- code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
- if (code != TSDB_CODE_SUCCESS || !hasEpset) {
- taosMemoryFree(buf);
- taosWUnLockLatch(&pStream->lock);
- goto _ERR;
- }
-
- code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
- TSDB_CODE_SYN_PROPOSE_NOT_READY);
- if (code != 0) {
- taosMemoryFree(buf);
+ if (code != TSDB_CODE_SUCCESS) {
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
@@ -1200,7 +1167,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
// drop all tasks
- if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
+ if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@@ -1264,7 +1231,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
return -1;
} else {
#if 0
- if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
+ if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
@@ -1544,21 +1511,19 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
}
// add row for each task
- for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
- SArray *pLevel = taosArrayGetP(pStream->tasks, i);
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
+ while (streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
- int32_t numOfLevels = taosArrayGetSize(pLevel);
- for (int32_t j = 0; j < numOfLevels; j++) {
- SStreamTask *pTask = taosArrayGetP(pLevel, j);
- int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
- if (code == TSDB_CODE_SUCCESS) {
- numOfRows++;
- }
+ int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
+ if (code == TSDB_CODE_SUCCESS) {
+ numOfRows++;
}
}
- // unlock
+ destroyStreamTaskIter(pIter);
taosRUnLockLatch(&pStream->lock);
+
sdbRelease(pSdb, pStream);
}
@@ -1664,21 +1629,26 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
- SMResumeStreamReq pauseReq = {0};
- if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
+ if(grantCheck(TSDB_GRANT_STREAMS) < 0){
+ terrno = TSDB_CODE_GRANT_EXPIRED;
+ return -1;
+ }
+
+ SMResumeStreamReq resumeReq = {0};
+ if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
- pStream = mndAcquireStream(pMnode, pauseReq.name);
+ pStream = mndAcquireStream(pMnode, resumeReq.name);
if (pStream == NULL) {
- if (pauseReq.igNotExists) {
- mInfo("stream:%s not exist, not resume stream", pauseReq.name);
+ if (resumeReq.igNotExists) {
+ mInfo("stream:%s not exist, not resume stream", resumeReq.name);
sdbRelease(pMnode->pSdb, pStream);
return 0;
} else {
- mError("stream:%s not exist, failed to resume stream", pauseReq.name);
+ mError("stream:%s not exist, failed to resume stream", resumeReq.name);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
@@ -1703,7 +1673,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
if (pTrans == NULL) {
- mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
+ mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
@@ -1711,8 +1681,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
// set the resume action
- if (mndStreamSetResumeAction(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
- mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
+ if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) {
+ mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
@@ -1898,22 +1868,19 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
}
taosWLockLatch(&pStream->lock);
- int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
- for (int32_t j = 0; j < numOfLevels; ++j) {
- SArray *pLevel = taosArrayGetP(pStream->tasks, j);
+ SStreamTaskIter *pTaskIter = createStreamTaskIter(pStream);
+ while (streamTaskIterNextTask(pTaskIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pTaskIter);
- int32_t numOfTasks = taosArrayGetSize(pLevel);
- for (int32_t k = 0; k < numOfTasks; ++k) {
- SStreamTask *pTask = taosArrayGetP(pLevel, k);
-
- SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
- epsetAssign(&entry.epset, &pTask->info.epSet);
- taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
- }
+ SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
+ epsetAssign(&entry.epset, &pTask->info.epSet);
+ taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
}
+ destroyStreamTaskIter(pTaskIter);
taosWUnLockLatch(&pStream->lock);
+
sdbRelease(pSdb, pStream);
}
@@ -2099,58 +2066,50 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
}
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
- int32_t level = taosArrayGetSize(pStream->tasks);
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
+ while (streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
- for (int32_t i = 0; i < level; i++) {
- SArray *pLevel = taosArrayGetP(pStream->tasks, i);
+ STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
+ void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
+ if (p == NULL) {
+ STaskStatusEntry entry = {0};
+ streamTaskStatusInit(&entry, pTask);
- int32_t numOfTasks = taosArrayGetSize(pLevel);
- for (int32_t j = 0; j < numOfTasks; j++) {
- SStreamTask *pTask = taosArrayGetP(pLevel, j);
-
- STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
- void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
- if (p == NULL) {
- STaskStatusEntry entry = {0};
- streamTaskStatusInit(&entry, pTask);
-
- taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
- taosArrayPush(pExecNode->pTaskList, &id);
- mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId,
- (int32_t)taosArrayGetSize(pExecNode->pTaskList));
- }
+ taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
+ taosArrayPush(pExecNode->pTaskList, &id);
+ mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId,
+ (int32_t)taosArrayGetSize(pExecNode->pTaskList));
}
}
+
+ destroyStreamTaskIter(pIter);
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
- int32_t level = taosArrayGetSize(pStream->tasks);
- for (int32_t i = 0; i < level; i++) {
- SArray *pLevel = taosArrayGetP(pStream->tasks, i);
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
+ while (streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
- int32_t numOfTasks = taosArrayGetSize(pLevel);
- for (int32_t j = 0; j < numOfTasks; j++) {
- SStreamTask *pTask = taosArrayGetP(pLevel, j);
+ STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
+ void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
+ if (p != NULL) {
+ taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
- STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
- void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
- if (p != NULL) {
- taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
+ for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
+ STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
+ if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
+ taosArrayRemove(pExecNode->pTaskList, k);
- for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
- STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
- if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
- taosArrayRemove(pExecNode->pTaskList, k);
-
- int32_t num = taosArrayGetSize(pExecNode->pTaskList);
- mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
- break;
- }
+ int32_t num = taosArrayGetSize(pExecNode->pTaskList);
+ mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
+ break;
}
}
}
}
+ destroyStreamTaskIter(pIter);
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
@@ -2181,7 +2140,6 @@ static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numO
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
-
SStreamTaskCheckpointReq req = {0};
SDecoder decoder = {0};
diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c
index e4599edbd4..4426ab0672 100644
--- a/source/dnode/mnode/impl/src/mndStreamHb.c
+++ b/source/dnode/mnode/impl/src/mndStreamHb.c
@@ -16,6 +16,12 @@
#include "mndStream.h"
#include "mndTrans.h"
+typedef struct SFailedCheckpointInfo {
+ int64_t streamUid;
+ int64_t checkpointId;
+ int32_t transId;
+} SFailedCheckpointInfo;
+
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
@@ -59,61 +65,24 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
taosArrayPush(pList, pInfo);
}
-static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
+int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
}
/*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
-
- taosWLockLatch(&pStream->lock);
- int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
-
- for (int32_t j = 0; j < numOfLevels; ++j) {
- SArray *pLevel = taosArrayGetP(pStream->tasks, j);
-
- int32_t numOfTasks = taosArrayGetSize(pLevel);
- for (int32_t k = 0; k < numOfTasks; ++k) {
- SStreamTask *pTask = taosArrayGetP(pLevel, k);
-
- // todo extract method, with pause stream task
- SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
- if (pReq == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
- tstrerror(TSDB_CODE_OUT_OF_MEMORY));
- taosWUnLockLatch(&pStream->lock);
- return terrno;
- }
-
- pReq->head.vgId = htonl(pTask->info.nodeId);
- pReq->taskId = pTask->id.taskId;
- pReq->streamId = pTask->id.streamId;
-
- SEpSet epset = {0};
- bool hasEpset = false;
- int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
- if (code != TSDB_CODE_SUCCESS || !hasEpset) {
- taosMemoryFree(pReq);
- continue;
- }
-
- code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
- if (code != 0) {
- taosMemoryFree(pReq);
- taosWUnLockLatch(&pStream->lock);
- mndTransDrop(pTrans);
- return terrno;
- }
- }
+ int32_t code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream);
+ if (code != 0) {
+ sdbRelease(pMnode->pSdb, pStream);
+ mndTransDrop(pTrans);
+ return code;
}
- taosWUnLockLatch(&pStream->lock);
-
- int32_t code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
+ code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream);
+ mndTransDrop(pTrans);
return -1;
}
@@ -177,10 +146,90 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
return TSDB_CODE_SUCCESS;
}
+static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) {
+ SOrphanTask* pTask = taosArrayGet(pList, 0);
+
+ // check if it is conflict with other trans in both sourceDb and targetDb.
+ bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
+ if (conflict) {
+ return -1;
+ }
+
+ SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
+ STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, MND_STREAM_DROP_NAME, "drop stream");
+ if (pTrans == NULL) {
+ mError("failed to create trans to drop orphan tasks since %s", terrstr());
+ return -1;
+ }
+
+ int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
+
+ // drop all tasks
+ if (mndStreamSetDropActionFromList(pMnode, pTrans, pList) < 0) {
+ mError("failed to create trans to drop orphan tasks since %s", terrstr());
+ mndTransDrop(pTrans);
+ return -1;
+ }
+
+ // drop stream
+ if (mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED) < 0) {
+ mndTransDrop(pTrans);
+ return -1;
+ }
+
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
+ mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
+ mndTransDrop(pTrans);
+ return -1;
+ }
+ mndTransDrop(pTrans);
+ return 0;
+}
+
+int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
+ SSdb *pSdb = pMnode->pSdb;
+ SStreamObj *pStream = NULL;
+ void* pIter = NULL;
+ while(1) {
+ pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
+ if (pIter == NULL) break;
+
+ if(pStream->status != STREAM_STATUS__PAUSE){
+ SMPauseStreamReq reqPause = {0};
+ strcpy(reqPause.name, pStream->name);
+ reqPause.igNotExists = 1;
+
+ int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
+ void * pHead = rpcMallocCont(contLen);
+ tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
+
+ SRpcMsg rpcMsg = {
+ .msgType = TDMT_MND_PAUSE_STREAM,
+ .pCont = pHead,
+ .contLen = contLen,
+ .info = *info,
+ };
+
+ tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
+ mInfo("receive pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
+ }
+
+ sdbRelease(pSdb, pStream);
+ }
+ return 0;
+}
+
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
- SArray *pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
+ SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
+ SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
+
+ if(grantCheck(TSDB_GRANT_STREAMS) < 0){
+ if(suspendAllStreams(pMnode, &pReq->info) < 0){
+ return -1;
+ }
+ }
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
@@ -198,8 +247,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
// extract stream task list
- int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
- if (numOfExisted == 0) {
+ if (taosHashGetSize(execInfo.pTaskMap) == 0) {
doExtractTasksFromStream(pMnode);
}
@@ -218,6 +266,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
+
+ SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
+ taosArrayPush(pOrphanTasks, &oTask);
continue;
}
@@ -240,15 +291,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
streamTaskStatusCopy(pTaskEntry, p);
- if (p->checkpointId != 0) {
- if (p->checkpointFailed) {
- mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
- p->checkpointId, p->chkpointTransId);
+ if ((p->checkpointId != 0) && p->checkpointFailed) {
+ mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
+ p->checkpointId, p->chkpointTransId);
- SFailedCheckpointInfo info = {
- .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
- addIntoCheckpointList(pList, &info);
- }
+ SFailedCheckpointInfo info = {
+ .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
+ addIntoCheckpointList(pFailedTasks, &info);
}
}
@@ -266,15 +315,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
- if (taosArrayGetSize(pList) > 0) {
+ if (taosArrayGetSize(pFailedTasks) > 0) {
bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
- for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
- SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i);
+ for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
+ SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId);
@@ -285,9 +334,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
+ // handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors.
+ if (taosArrayGetSize(pOrphanTasks) > 0) {
+ mndDropOrphanTasks(pMnode, pOrphanTasks);
+ }
+
taosThreadMutexUnlock(&execInfo.lock);
streamMetaClearHbMsg(&req);
- taosArrayDestroy(pList);
+ taosArrayDestroy(pFailedTasks);
+ taosArrayDestroy(pOrphanTasks);
+
return TSDB_CODE_SUCCESS;
}
diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c
index 0a7397827e..5bfd3933b5 100644
--- a/source/dnode/mnode/impl/src/mndStreamTrans.c
+++ b/source/dnode/mnode/impl/src/mndStreamTrans.c
@@ -23,10 +23,10 @@ typedef struct SKeyInfo {
static int32_t clearFinishedTrans(SMnode* pMnode);
-int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid) {
+int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamId) {
SStreamTransInfo info = {
- .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamUid = streamUid};
- taosHashPut(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid), &info, sizeof(SStreamTransInfo));
+ .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
+ taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
return 0;
}
@@ -65,7 +65,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
return 0;
}
-bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* pTransName, bool lock) {
+bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* pTransName, bool lock) {
if (lock) {
taosThreadMutexLock(&execInfo.lock);
}
@@ -80,7 +80,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
clearFinishedTrans(pMnode);
- SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid));
+ SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
if (pEntry != NULL) {
SStreamTransInfo tInfo = *pEntry;
@@ -90,7 +90,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) {
- mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
+ mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
return true;
@@ -99,13 +99,13 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
}
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) {
- mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
+ mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
return true;
}
} else {
- mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamUid);
+ mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamId);
}
if (lock) {
@@ -301,86 +301,4 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("complete clear checkpoints in Dbs");
}
-static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
- int32_t transId) {
- pMsg->streamId = pId->streamId;
- pMsg->taskId = pId->taskId;
- pMsg->transId = transId;
- pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
- taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
-}
-static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
- SStreamTaskId *pId, int32_t transId) {
- SStreamTaskNodeUpdateMsg req = {0};
- initNodeUpdateMsg(&req, pInfo, pId, transId);
-
- int32_t code = 0;
- int32_t blen;
-
- tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
- if (code < 0) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- taosArrayDestroy(req.pNodeList);
- return -1;
- }
-
- int32_t tlen = sizeof(SMsgHead) + blen;
-
- void *buf = taosMemoryMalloc(tlen);
- if (buf == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- taosArrayDestroy(req.pNodeList);
- return -1;
- }
-
- void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
- SEncoder encoder;
- tEncoderInit(&encoder, abuf, tlen);
- tEncodeStreamTaskUpdateMsg(&encoder, &req);
-
- SMsgHead *pMsgHead = (SMsgHead *)buf;
- pMsgHead->contLen = htonl(tlen);
- pMsgHead->vgId = htonl(nodeId);
-
- tEncoderClear(&encoder);
-
- *pBuf = buf;
- *pLen = tlen;
-
- taosArrayDestroy(req.pNodeList);
- return TSDB_CODE_SUCCESS;
-}
-
-// todo extract method: traverse stream tasks
-// build trans to update the epset
-int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
- mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
-
- taosWLockLatch(&pStream->lock);
- int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
-
- for (int32_t j = 0; j < numOfLevels; ++j) {
- SArray *pLevel = taosArrayGetP(pStream->tasks, j);
-
- int32_t numOfTasks = taosArrayGetSize(pLevel);
- for (int32_t k = 0; k < numOfTasks; ++k) {
- SStreamTask *pTask = taosArrayGetP(pLevel, k);
-
- void *pBuf = NULL;
- int32_t len = 0;
- streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
- doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
-
- int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
- if (code != TSDB_CODE_SUCCESS) {
- taosMemoryFree(pBuf);
- taosWUnLockLatch(&pStream->lock);
- return -1;
- }
- }
- }
-
- taosWUnLockLatch(&pStream->lock);
- return 0;
-}
\ No newline at end of file
diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c
index 2ee73528e0..235c604b27 100644
--- a/source/dnode/mnode/impl/src/mndStreamUtil.c
+++ b/source/dnode/mnode/impl/src/mndStreamUtil.c
@@ -18,6 +18,66 @@
#include "tmisce.h"
#include "mndVgroup.h"
+struct SStreamTaskIter {
+ SStreamObj *pStream;
+ int32_t level;
+ int32_t ordinalIndex;
+ int32_t totalLevel;
+ SStreamTask *pTask;
+};
+
+SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) {
+ SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
+ if (pIter == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
+
+ pIter->level = -1;
+ pIter->ordinalIndex = 0;
+ pIter->pStream = pStream;
+ pIter->totalLevel = taosArrayGetSize(pStream->tasks);
+ pIter->pTask = NULL;
+
+ return pIter;
+}
+
+bool streamTaskIterNextTask(SStreamTaskIter* pIter) {
+ if (pIter->level >= pIter->totalLevel) {
+ pIter->pTask = NULL;
+ return false;
+ }
+
+ if (pIter->level == -1) {
+ pIter->level += 1;
+ }
+
+ while(pIter->level < pIter->totalLevel) {
+ SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
+ if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
+ pIter->level += 1;
+ pIter->ordinalIndex = 0;
+ pIter->pTask = NULL;
+ continue;
+ }
+
+ pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
+ pIter->ordinalIndex += 1;
+ return true;
+ }
+
+ pIter->pTask = NULL;
+ return false;
+}
+
+SStreamTask* streamTaskIterGetCurrent(SStreamTaskIter* pIter) {
+ return pIter->pTask;
+}
+
+void destroyStreamTaskIter(SStreamTaskIter* pIter) {
+ taosMemoryFree(pIter);
+}
+
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
@@ -175,18 +235,16 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
}
SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
- for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
- SArray *pLevel = taosArrayGetP(pStream->tasks, i);
-
- int32_t numOfLevels = taosArrayGetSize(pLevel);
- for (int32_t j = 0; j < numOfLevels; j++) {
- SStreamTask *pTask = taosArrayGetP(pLevel, j);
- if (pTask->id.taskId == pId->taskId) {
- return pTask;
- }
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
+ while (streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
+ if (pTask->id.taskId == pId->taskId) {
+ destroyStreamTaskIter(pIter);
+ return pTask;
}
}
+ destroyStreamTaskIter(pIter);
return NULL;
}
@@ -201,21 +259,20 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
}
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
- int32_t size = taosArrayGetSize(pStream->tasks);
- for (int32_t i = 0; i < size; i++) {
- SArray *pTasks = taosArrayGetP(pStream->tasks, i);
- int32_t sz = taosArrayGetSize(pTasks);
- for (int32_t j = 0; j < sz; j++) {
- SStreamTask *pTask = taosArrayGetP(pTasks, j);
- if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
- return -1;
- }
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
- if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
- atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
- }
+ while (streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
+ if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
+ destroyStreamTaskIter(pIter);
+ return -1;
+ }
+
+ if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
+ atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
}
+ destroyStreamTaskIter(pIter);
return 0;
}
@@ -251,24 +308,238 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
}
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
- SArray *tasks = pStream->tasks;
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
- int32_t size = taosArrayGetSize(tasks);
- for (int32_t i = 0; i < size; i++) {
- SArray *pTasks = taosArrayGetP(tasks, i);
- int32_t sz = taosArrayGetSize(pTasks);
- for (int32_t j = 0; j < sz; j++) {
- SStreamTask *pTask = taosArrayGetP(pTasks, j);
+ while (streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
+ if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
+ destroyStreamTaskIter(pIter);
+ return -1;
+ }
- if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
- return -1;
- }
-
- if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
- atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
- atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
- }
+ if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
+ atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
+ atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
}
+
+ destroyStreamTaskIter(pIter);
return 0;
-}
\ No newline at end of file
+}
+
+static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
+ SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
+ if (pReq == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
+
+ pReq->head.vgId = htonl(pTask->info.nodeId);
+ pReq->taskId = pTask->id.taskId;
+ pReq->streamId = pTask->id.streamId;
+
+ SEpSet epset = {0};
+ bool hasEpset = false;
+ int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
+ if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
+ terrno = code;
+ return -1;
+ }
+
+ // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
+ code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
+ if (code != 0) {
+ taosMemoryFree(pReq);
+ return -1;
+ }
+
+ return 0;
+}
+
+int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
+
+ while(streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
+ if (doSetDropAction(pMnode, pTrans, pTask) < 0) {
+ destroyStreamTaskIter(pIter);
+ return -1;
+ }
+ }
+ destroyStreamTaskIter(pIter);
+ return 0;
+}
+
+static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
+ SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
+ if (pReq == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
+
+ pReq->head.vgId = htonl(pTask->nodeId);
+ pReq->taskId = pTask->taskId;
+ pReq->streamId = pTask->streamId;
+
+ SEpSet epset = {0};
+ bool hasEpset = false;
+ int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
+ if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction
+ terrno = code;
+ taosMemoryFree(pReq);
+ return -1;
+ }
+
+ // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
+ code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
+ if (code != 0) {
+ taosMemoryFree(pReq);
+ return -1;
+ }
+
+ return 0;
+}
+
+int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
+ for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
+ SOrphanTask* pTask = taosArrayGet(pList, i);
+ mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
+ doSetDropActionFromId(pMnode, pTrans, pTask);
+ }
+ return 0;
+}
+
+static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
+ int32_t transId) {
+ pMsg->streamId = pId->streamId;
+ pMsg->taskId = pId->taskId;
+ pMsg->transId = transId;
+ pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
+ taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
+}
+
+static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
+ SStreamTaskId *pId, int32_t transId) {
+ SStreamTaskNodeUpdateMsg req = {0};
+ initNodeUpdateMsg(&req, pInfo, pId, transId);
+
+ int32_t code = 0;
+ int32_t blen;
+
+ tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
+ if (code < 0) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ taosArrayDestroy(req.pNodeList);
+ return -1;
+ }
+
+ int32_t tlen = sizeof(SMsgHead) + blen;
+
+ void *buf = taosMemoryMalloc(tlen);
+ if (buf == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ taosArrayDestroy(req.pNodeList);
+ return -1;
+ }
+
+ void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
+ SEncoder encoder;
+ tEncoderInit(&encoder, abuf, tlen);
+ tEncodeStreamTaskUpdateMsg(&encoder, &req);
+
+ SMsgHead *pMsgHead = (SMsgHead *)buf;
+ pMsgHead->contLen = htonl(tlen);
+ pMsgHead->vgId = htonl(nodeId);
+
+ tEncoderClear(&encoder);
+
+ *pBuf = buf;
+ *pLen = tlen;
+
+ taosArrayDestroy(req.pNodeList);
+ return TSDB_CODE_SUCCESS;
+}
+
+static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
+ void *pBuf = NULL;
+ int32_t len = 0;
+ streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
+
+ doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
+
+ int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
+ if (code != TSDB_CODE_SUCCESS) {
+ taosMemoryFree(pBuf);
+ }
+
+ return code;
+}
+
+// build trans to update the epset
+int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
+ mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
+ taosWLockLatch(&pStream->lock);
+
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
+ while (streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
+ int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo);
+ if (code != TSDB_CODE_SUCCESS) {
+ destroyStreamTaskIter(pIter);
+ taosWUnLockLatch(&pStream->lock);
+ return -1;
+ }
+ }
+
+ destroyStreamTaskIter(pIter);
+ taosWUnLockLatch(&pStream->lock);
+ return 0;
+}
+
+static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
+ SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
+ if (pReq == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
+ tstrerror(TSDB_CODE_OUT_OF_MEMORY));
+ return terrno;
+ }
+
+ pReq->head.vgId = htonl(pTask->info.nodeId);
+ pReq->taskId = pTask->id.taskId;
+ pReq->streamId = pTask->id.streamId;
+
+ SEpSet epset = {0};
+ bool hasEpset = false;
+ int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
+ if (code != TSDB_CODE_SUCCESS || !hasEpset) {
+ taosMemoryFree(pReq);
+ return code;
+ }
+
+ code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
+ if (code != TSDB_CODE_SUCCESS) {
+ taosMemoryFree(pReq);
+ }
+
+ return code;
+}
+
+int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
+ taosWLockLatch(&pStream->lock);
+
+ SStreamTaskIter *pIter = createStreamTaskIter(pStream);
+ while (streamTaskIterNextTask(pIter)) {
+ SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
+ int32_t code = doSetResetAction(pMnode, pTrans, pTask);
+ if (code != TSDB_CODE_SUCCESS) {
+ destroyStreamTaskIter(pIter);
+ taosWUnLockLatch(&pStream->lock);
+ return -1;
+ }
+ }
+
+ destroyStreamTaskIter(pIter);
+ taosWUnLockLatch(&pStream->lock);
+ return 0;
+}
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index 9075f0145c..1adc4ed4bf 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -599,6 +599,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->originRpcType = pReq->msgType;
}
+ mInfo("trans:%d, create transaction:%s, origin:%s", pTrans->id, pTrans->opername, opername);
+
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans;
}
@@ -845,6 +847,8 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
+ if(pTrans == NULL) return -1;
+
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
return -1;
}
diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c
index 0e3b544508..5e5a3626a4 100644
--- a/source/dnode/mnode/impl/src/mndUser.c
+++ b/source/dnode/mnode/impl/src/mndUser.c
@@ -1925,6 +1925,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
return -1;
}
taosHashPut(pNewUser->topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN);
+ mndReleaseTopic(pMnode, pTopic);
}
if (ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(pAlterReq->alterType, pAlterReq->privileges)) {
@@ -1935,6 +1936,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
return -1;
}
taosHashRemove(pNewUser->topics, pAlterReq->objname, len);
+ mndReleaseTopic(pMnode, pTopic);
}
return TSDB_CODE_SUCCESS;
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index a689932754..1ade1c8c41 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -835,8 +835,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
- if (pTask->chkInfo.checkpointId != 0) {
- pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
+ if (pChkInfo->checkpointId != 0) {
+ pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 3472f4e14d..fc0589031a 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -1080,7 +1080,7 @@ bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
- STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
+ STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c
index ff4d3d0d27..76dc622cfd 100644
--- a/source/libs/executor/src/executorInt.c
+++ b/source/libs/executor/src/executorInt.c
@@ -311,6 +311,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0;
+ pInput->blankFill = pBlock->info.blankFill;
// NOTE: the last parameter is the primary timestamp column
// todo: refactor this
@@ -325,6 +326,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0;
+ pInput->blankFill = pBlock->info.blankFill;
code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 9e48bbb214..bb01836881 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -663,6 +663,8 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
+ pInfo->pResBlock->info.blankFill = false;
+
if (!pInfo->needCountEmptyTable) {
pInfo->countState = TABLE_COUNT_STATE_END;
} else {
@@ -687,6 +689,7 @@ static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBas
pBlock->info.rows = 1;
pBlock->info.id.uid = tbInfo->uid;
pBlock->info.id.groupId = tbInfo->groupId;
+ pBlock->info.blankFill = true;
// only one row: set all col data to null & hasNull
int32_t col_num = blockDataGetNumOfCols(pBlock);
@@ -696,7 +699,7 @@ static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBas
}
// set tag/tbname
- doSetTagColumnData(pBase, pBlock, pTaskInfo, pBlock->info.rows);
+ doSetTagColumnData(pBase, pBlock, pTaskInfo, 1);
return pBlock;
}
diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c
index db9266cb8f..3559e57ebc 100644
--- a/source/libs/executor/src/tsort.c
+++ b/source/libs/executor/src/tsort.c
@@ -1334,7 +1334,6 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
int32_t tsortClose(SSortHandle* pHandle) {
atomic_val_compare_exchange_8(&pHandle->closed, 0, 1);
- taosMsleep(10);
return TSDB_CODE_SUCCESS;
}
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index 000f634fe5..5ab6d5e075 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -499,6 +499,9 @@ static int64_t getNumOfElems(SqlFunctionCtx* pCtx) {
*/
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
+ if(1 == pInput->numOfRows && pInput->blankFill) {
+ return 0;
+ }
if (pInput->colDataSMAIsSet && pInput->totalRows == pInput->numOfRows) {
numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
} else {
@@ -6022,7 +6025,7 @@ int32_t groupKeyFunction(SqlFunctionCtx* pCtx) {
goto _group_key_over;
}
- if (colDataIsNull_s(pInputCol, startIndex)) {
+ if (pInputCol->pData == NULL || colDataIsNull_s(pInputCol, startIndex)) {
pInfo->isNull = true;
pInfo->hasResult = true;
goto _group_key_over;
diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c
index 1994ddb437..512dfdaef2 100644
--- a/source/libs/parser/src/parInsertSql.c
+++ b/source/libs/parser/src/parInsertSql.c
@@ -440,14 +440,14 @@ static int32_t parseVarbinary(SToken* pToken, uint8_t **pData, uint32_t *nData,
return TSDB_CODE_PAR_INVALID_VARBINARY;
}
- if(isHex(pToken->z, pToken->n)){
- if(!isValidateHex(pToken->z, pToken->n)){
+ if(isHex(pToken->z + 1, pToken->n - 2)){
+ if(!isValidateHex(pToken->z + 1, pToken->n - 2)){
return TSDB_CODE_PAR_INVALID_VARBINARY;
}
void* data = NULL;
uint32_t size = 0;
- if(taosHex2Ascii(pToken->z, pToken->n, &data, &size) < 0){
+ if(taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0){
return TSDB_CODE_OUT_OF_MEMORY;
}
@@ -458,11 +458,13 @@ static int32_t parseVarbinary(SToken* pToken, uint8_t **pData, uint32_t *nData,
*pData = data;
*nData = size;
}else{
- if (pToken->n + VARSTR_HEADER_SIZE > bytes) {
+ *pData = taosMemoryCalloc(1, pToken->n);
+ int32_t len = trimString(pToken->z, pToken->n, *pData, pToken->n);
+ *nData = len;
+
+ if (*nData + VARSTR_HEADER_SIZE > bytes) {
return TSDB_CODE_PAR_VALUE_TOO_LONG;
}
- *pData = taosStrdup(pToken->z);
- *nData = pToken->n;
}
return TSDB_CODE_SUCCESS;
}
@@ -753,7 +755,7 @@ static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* p
return TSDB_CODE_SUCCESS;
}
-static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
+static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) {
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
@@ -763,7 +765,7 @@ static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMs
}
// Remove quotation marks
- if (TK_NK_STRING == pToken->type) {
+ if (TK_NK_STRING == pToken->type && type != TSDB_DATA_TYPE_VARBINARY) {
if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
}
@@ -935,7 +937,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt
SSchema* pTagSchema = &pSchema[pCxt->tags.pColIndex[i]];
isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON;
- code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg);
+ code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
if (TK_NK_VARIABLE == token.type) {
code = buildSyntaxErrMsg(&pCxt->msg, "not expected tags values ", token.z);
}
@@ -1631,7 +1633,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
int16_t timePrec, SColVal* pVal) {
- int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
+ int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pSchema->type);
if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
return buildSyntaxErrMsg(&pCxt->msg, "primary timestamp should not be null", pToken->z);
@@ -1691,7 +1693,7 @@ typedef union SRowsDataContext{
static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, bool* pFoundCtbName) {
*pFoundCtbName = false;
- int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
+ int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, TSDB_DATA_TYPE_BINARY);
if (TK_NK_VARIABLE == pToken->type) {
code = buildInvalidOperationMsg(&pCxt->msg, "not expected tbname");
}
@@ -1731,7 +1733,7 @@ static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModif
for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) {
SToken* pTagToken = (SToken*)(tagTokens + i);
SSchema* pTagSchema = tagSchemas[i];
- code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg);
+ code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
if (TK_NK_VARIABLE == pTagToken->type) {
code = buildInvalidOperationMsg(&pCxt->msg, "not expected tag");
}
@@ -1790,7 +1792,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
tagSchemas[(*pNumOfTagTokens)] = (SSchema*)pTagSchema;
++(*pNumOfTagTokens);
} else {
- code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
+ code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
if (TK_NK_VARIABLE == pToken->type) {
code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
}
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 27748c84a0..b0170d5083 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -390,6 +390,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
+ stDebug("s-task:%s after exceed the threshold:%" PRId64 " and then update the window filter",
+ pStreamTask->id.idStr, pStreamTask->dataRange.range.maxVer);
} else {
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
}
@@ -400,7 +402,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 3. send msg to mnode to launch a checkpoint to keep the state for current stream
streamTaskSendCheckpointReq(pStreamTask);
-// streamTaskResume(pStreamTask);
// 4. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
@@ -777,6 +778,8 @@ int32_t streamResumeTask(SStreamTask* pTask) {
while (1) {
/*int32_t code = */ doStreamExecTask(pTask);
+
+ // check if continue
taosThreadMutexLock(&pTask->lock);
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index db71b56815..5e53a921b9 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -1312,28 +1312,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
}
void streamMetaRLock(SStreamMeta* pMeta) {
- stTrace("vgId:%d meta-rlock", pMeta->vgId);
+// stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosThreadRwlockRdlock(&pMeta->lock);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
- stTrace("vgId:%d meta-runlock", pMeta->vgId);
+// stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
} else {
- stDebug("vgId:%d meta-runlock completed", pMeta->vgId);
+// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
}
}
void streamMetaWLock(SStreamMeta* pMeta) {
- stTrace("vgId:%d meta-wlock", pMeta->vgId);
+// stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosThreadRwlockWrlock(&pMeta->lock);
- stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
+// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
- stTrace("vgId:%d meta-wunlock", pMeta->vgId);
+// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosThreadRwlockUnlock(&pMeta->lock);
}
diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c
index 20fdcff7d9..2f5bca8ed9 100644
--- a/source/libs/stream/src/streamStart.c
+++ b/source/libs/stream/src/streamStart.c
@@ -385,7 +385,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
-
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
int64_t initTs = pTask->execInfo.init;
@@ -989,4 +988,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
}
}
-
diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c
index ea20f0e2b1..e370312338 100644
--- a/source/libs/stream/src/streamState.c
+++ b/source/libs/stream/src/streamState.c
@@ -670,7 +670,6 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur) {
return;
}
- qDebug("streamStateFreeCur");
streamStateResetCur(pCur);
taosMemoryFree(pCur);
}
diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c
index 362d38b505..b324ca5f91 100644
--- a/source/libs/transport/src/transSvr.c
+++ b/source/libs/transport/src/transSvr.c
@@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
-static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
+static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
@@ -1382,7 +1382,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
tFreeSUpdateIpWhiteReq(req);
taosMemoryFree(req);
} else {
- tInfo("ip-white-list disable on trans");
+ tDebug("ip-white-list disable on trans");
thrd->enableIpWhiteList = 0;
}
taosMemoryFree(msg);
diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c
index 1dfdd637b6..2cc13be6ba 100644
--- a/source/util/src/tqueue.c
+++ b/source/util/src/tqueue.c
@@ -159,6 +159,7 @@ void taosFreeQitem(void *pItem) {
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
int32_t code = 0;
STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
+ pNode->timestamp = taosGetTimestampUs();
pNode->next = NULL;
taosThreadMutexLock(&queue->mutex);
@@ -464,6 +465,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
qinfo->ahandle = queue->ahandle;
qinfo->fp = queue->itemsFp;
qinfo->queue = queue;
+ qinfo->timestamp = queue->head->timestamp;
queue->head = NULL;
queue->tail = NULL;
@@ -489,8 +491,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
-int64_t taosQallUnAccessedItemSize(STaosQall *qall) {return qall->unAccessedNumOfItems;}
-int64_t taosQallUnAccessedMemSize(STaosQall *qall) {return qall->unAccessMemOfItems;}
+int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
+int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c
index 57dc60e539..c4b3271c65 100644
--- a/source/util/src/tworker.c
+++ b/source/util/src/tworker.c
@@ -15,10 +15,12 @@
#define _DEFAULT_SOURCE
#include "tworker.h"
-#include "tgeosctx.h"
#include "taoserror.h"
+#include "tgeosctx.h"
#include "tlog.h"
+#define QUEUE_THRESHOLD 1000 * 1000
+
typedef void *(*ThreadFp)(void *param);
int32_t tQWorkerInit(SQWorkerPool *pool) {
@@ -84,6 +86,13 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
break;
}
+ if (qinfo.timestamp != 0) {
+ int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
+ if (cost > QUEUE_THRESHOLD) {
+ uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
+ }
+ }
+
if (qinfo.fp != NULL) {
qinfo.workerId = worker->id;
qinfo.threadNum = pool->num;
@@ -198,6 +207,13 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
break;
}
+ if (qinfo.timestamp != 0) {
+ int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
+ if (cost > QUEUE_THRESHOLD) {
+ uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
+ }
+ }
+
if (qinfo.fp != NULL) {
qinfo.workerId = worker->id;
qinfo.threadNum = taosArrayGetSize(pool->workers);
@@ -221,7 +237,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
int32_t queueNum = taosGetQueueNumber(pool->qset);
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
- if (dstWorkerNum < 1) dstWorkerNum = 1;
+ if (dstWorkerNum < 2) dstWorkerNum = 2;
// spawn a thread to process queue
while (curWorkerNum < dstWorkerNum) {
@@ -338,6 +354,13 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
break;
}
+ if (qinfo.timestamp != 0) {
+ int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
+ if (cost > QUEUE_THRESHOLD) {
+ uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
+ }
+ }
+
if (qinfo.fp != NULL) {
qinfo.workerId = worker->id;
qinfo.threadNum = pool->num;
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index d932529d0a..91a0ac46e5 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -230,6 +230,7 @@ fi
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -i True
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True
+,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform.py -N 2 -n 1
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 2 -n 1
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 6 -n 3
#,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-db.py -N 6 -n 3
@@ -292,6 +293,7 @@ fi
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py
+,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_double.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
diff --git a/tests/system-test/0-others/subscribe_stream_privilege.py b/tests/system-test/0-others/subscribe_stream_privilege.py
new file mode 100644
index 0000000000..b477af9f57
--- /dev/null
+++ b/tests/system-test/0-others/subscribe_stream_privilege.py
@@ -0,0 +1,184 @@
+###################################################################
+# Copyright (c) 2016 by TAOS Technologies, Inc.
+# All rights reserved.
+#
+# This file is proprietary and confidential to TAOS Technologies.
+# No part of this file may be reproduced, stored, transmitted,
+# disclosed or used in any form or by any means other than as
+# expressly provided by the written permission from Jianhui Tao
+#
+###################################################################
+
+# -*- coding: utf-8 -*-
+import time
+
+import taos
+from taos.tmq import *
+from util.cases import *
+from util.common import *
+from util.log import *
+from util.sql import *
+from util.sqlset import *
+
+
+class TDTestCase:
+ clientCfgDict = {'debugFlag': 135}
+ updatecfgDict = {'debugFlag': 143, 'clientCfg':clientCfgDict}
+ def init(self, conn, logSql, replicaVar=1):
+ self.replicaVar = int(replicaVar)
+ tdLog.debug("start to execute %s" % __file__)
+ tdSql.init(conn.cursor())
+ self.setsql = TDSetSql()
+ self.stbname = 'stb'
+ self.user_name = 'test'
+ self.binary_length = 20 # the length of binary for column_dict
+ self.nchar_length = 20 # the length of nchar for column_dict
+ self.dbnames = ['db1']
+ self.column_dict = {
+ 'ts': 'timestamp',
+ 'col1': 'float',
+ 'col2': 'int',
+ 'col3': 'float',
+ }
+
+ self.tag_dict = {
+ 't1': 'int',
+ 't2': f'binary({self.binary_length})'
+ }
+
+ self.tag_list = [
+ f'1, "Beijing"',
+ f'2, "Shanghai"',
+ f'3, "Guangzhou"',
+ f'4, "Shenzhen"'
+ ]
+
+ self.values_list = [
+ f'now, 9.1, 200, 0.3'
+ ]
+
+ self.tbnum = 4
+ self.topic_name = 'topic1'
+
+
+ def prepare_data(self):
+ for db in self.dbnames:
+ tdSql.execute(f"create database {db} vgroups 1")
+ tdSql.execute(f"use {db}")
+ tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
+ for i in range(self.tbnum):
+ tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
+ for j in self.values_list:
+ tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
+
+ def checkUserPrivileges(self, rowCnt):
+ tdSql.query("show user privileges")
+ tdSql.checkRows(rowCnt)
+
+ def streamTest(self):
+ tdSql.execute("create stream s1 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
+ time.sleep(2)
+ tdSql.query("select * from so1")
+ tdSql.checkRows(4)
+ tdSql.execute("insert into stb_0(ts,col2) values(now, 332)")
+ time.sleep(2)
+ tdSql.query("select * from so1")
+ tdSql.checkRows(5)
+
+ time.sleep(2)
+ tdSql.query("select * from information_schema.ins_stream_tasks")
+ tdSql.checkData(0, 5, 'ready')
+
+ print(time.time())
+ while 1:
+ t = time.time()
+ if t > 1706254434 :
+ break
+ else:
+ print("time:%d" %(t))
+ time.sleep(1)
+
+
+ tdSql.error("create stream s11 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
+
+ time.sleep(10)
+ tdSql.query("select * from information_schema.ins_stream_tasks")
+ tdSql.checkData(0, 5, 'paused')
+ tdSql.execute("insert into stb_0(ts,col2) values(now, 3232)")
+ tdSql.query("select * from so1")
+ tdSql.checkRows(5)
+
+ tdSql.error("resume stream s1")
+
+ def consumeTest(self):
+ consumer_dict = {
+ "group.id": "g1",
+ "td.connect.user": self.user_name,
+ "td.connect.pass": "test",
+ "auto.offset.reset": "earliest"
+ }
+ consumer = Consumer(consumer_dict)
+
+ tdLog.debug("test subscribe topic created by other user")
+ exceptOccured = False
+ try:
+ consumer.subscribe([self.topic_name])
+ except TmqError:
+ exceptOccured = True
+
+ if not exceptOccured:
+ tdLog.exit(f"has no privilege, should except")
+
+ self.checkUserPrivileges(1)
+ tdLog.debug("test subscribe topic privilege granted by other user")
+ tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}')
+ self.checkUserPrivileges(2)
+
+ exceptOccured = False
+ try:
+ consumer.subscribe([self.topic_name])
+ except TmqError:
+ exceptOccured = True
+
+ if exceptOccured:
+ tdLog.exit(f"has privilege, should not except")
+
+ cnt = 0
+ try:
+ while True:
+ res = consumer.poll(1)
+ cnt += 1
+ if cnt == 1:
+ if not res:
+ tdLog.exit(f"grant privilege, should get res")
+ elif cnt == 2:
+ if res:
+ tdLog.exit(f"revoke privilege, should get NULL")
+ else:
+ break
+
+ tdLog.debug("test subscribe topic privilege revoked by other user")
+ tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}')
+ self.checkUserPrivileges(1)
+ time.sleep(5)
+
+ finally:
+ consumer.close()
+
+ def create_user(self):
+ tdSql.execute(f'create topic {self.topic_name} as database {self.dbnames[0]}')
+ tdSql.execute(f'create user {self.user_name} pass "test"')
+
+ def run(self):
+ self.prepare_data()
+ self.create_user()
+ self.consumeTest()
+ # self.streamTest()
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success("%s successfully executed" % __file__)
+
+
+tdCases.addWindows(__file__, TDTestCase())
+tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
diff --git a/tests/system-test/7-tmq/tmqDropConsumer.py b/tests/system-test/7-tmq/tmqDropConsumer.py
index 5208d14069..953e9314f1 100644
--- a/tests/system-test/7-tmq/tmqDropConsumer.py
+++ b/tests/system-test/7-tmq/tmqDropConsumer.py
@@ -12,7 +12,8 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
- # updatecfgDict = {'debugFlag': 135}
+ clientCfgDict = {'debugFlag': 135}
+ updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict}
def __init__(self):
self.vgroups = 2
diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py
index 0e9e8f989f..9286b69278 100644
--- a/tests/system-test/7-tmq/tmqParamsTest.py
+++ b/tests/system-test/7-tmq/tmqParamsTest.py
@@ -11,7 +11,9 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
- updatecfgDict = {'debugFlag': 135}
+ clientCfgDict = {'debugFlag': 135}
+ updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict}
+
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
diff --git a/tests/system-test/7-tmq/tmqVnodeTransform.py b/tests/system-test/7-tmq/tmqVnodeTransform.py
index 811b72c35f..c2b002ead6 100644
--- a/tests/system-test/7-tmq/tmqVnodeTransform.py
+++ b/tests/system-test/7-tmq/tmqVnodeTransform.py
@@ -186,7 +186,7 @@ class TDTestCase:
tmqCom.getStartCommitNotifyFromTmqsim()
#restart dnode & remove wal
- # self.restartAndRemoveWal()
+ self.restartAndRemoveWal()
# redistribute vgroup
self.redistributeVgroups();
@@ -235,7 +235,7 @@ class TDTestCase:
tdSql.execute(sqlString)
tdSql.query("flush database %s"%(paraDict['dbName']))
#restart dnode & remove wal
- # self.restartAndRemoveWal()
+ self.restartAndRemoveWal()
# redistribute vgroup
self.redistributeVgroups();
@@ -313,7 +313,7 @@ class TDTestCase:
time.sleep(5)
#restart dnode & remove wal
- # self.restartAndRemoveWal()
+ self.restartAndRemoveWal()
# redistribute vgroup
self.redistributeVgroups()
diff --git a/utils/test/c/varbinary_test.c b/utils/test/c/varbinary_test.c
index 522a820fe8..47bacf629b 100644
--- a/utils/test/c/varbinary_test.c
+++ b/utils/test/c/varbinary_test.c
@@ -85,7 +85,6 @@ void varbinary_sql_test() {
// test insert
pRes = taos_query(taos, "insert into tb2 using stb tags (2, 'tb2_bin1', 093) values (now + 2s, 'nchar1', 892, 0.3)");
- printf("error:%s", taos_errstr(pRes));
ASSERT(taos_errno(pRes) != 0);
pRes = taos_query(taos, "insert into tb3 using stb tags (3, 'tb3_bin1', 0x7f829) values (now + 3s, 'nchar1', 0x7f829, 0.3)");
@@ -322,6 +321,60 @@ void varbinary_sql_test() {
printf("%s result %s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
+ // test insert string value '\x'
+ pRes = taos_query(taos, "insert into tb5 using stb tags (5, 'tb5_bin1', '\\\\xg') values (now + 4s, 'nchar1', '\\\\xg', 0.3)");
+ taos_free_result(pRes);
+
+ pRes = taos_query(taos, "select c2,t3 from stb where t3 = '\\x5C7867'");
+ while ((row = taos_fetch_row(pRes)) != NULL) {
+ int32_t* length = taos_fetch_lengths(pRes);
+ void* data = NULL;
+ uint32_t size = 0;
+ if(taosAscii2Hex(row[0], length[0], &data, &size) < 0){
+ ASSERT(0);
+ }
+
+ ASSERT(memcmp(data, "\\x5C7867", size) == 0);
+ taosMemoryFree(data);
+
+ if(taosAscii2Hex(row[1], length[1], &data, &size) < 0){
+ ASSERT(0);
+ }
+
+ ASSERT(memcmp(data, "\\x5C7867", size) == 0);
+ taosMemoryFree(data);
+ }
+ taos_free_result(pRes);
+
+ // test insert
+ char tmp [65517*2+3] = {0};
+ tmp[0] = '\\';
+ tmp[1] = 'x';
+ memset(tmp + 2, 48, 65517*2);
+
+ char sql[65517*2+3 + 256] = {0};
+
+ pRes = taos_query(taos, "create stable stb1 (ts timestamp, c2 varbinary(65517)) tags (t1 int, t2 binary(8), t3 varbinary(8))");
+ taos_free_result(pRes);
+
+ sprintf(sql, "insert into tb6 using stb1 tags (6, 'tb6_bin1', '\\\\xg') values (now + 4s, '%s')", tmp);
+ pRes = taos_query(taos, sql);
+ taos_free_result(pRes);
+
+ pRes = taos_query(taos, "select c2 from tb6");
+ while ((row = taos_fetch_row(pRes)) != NULL) {
+ int32_t* length = taos_fetch_lengths(pRes);
+ void* data = NULL;
+ uint32_t size = 0;
+ if(taosAscii2Hex(row[0], length[0], &data, &size) < 0){
+ ASSERT(0);
+ }
+
+ ASSERT(memcmp(data, tmp, size) == 0);
+ taosMemoryFree(data);
+ }
+ taos_free_result(pRes);
+
taos_close(taos);
}