Merge branch '3.0' of github.com:taosdata/TDengine into szhou/tms/port-3-fe

This commit is contained in:
shenglian zhou 2024-02-01 09:38:54 +08:00
commit 87e2027475
48 changed files with 1096 additions and 508 deletions

View File

@ -12,7 +12,7 @@
[![Build Status](https://travis-ci.org/taosdata/TDengine.svg?branch=master)](https://travis-ci.org/taosdata/TDengine)
[![Build status](https://ci.appveyor.com/api/projects/status/kf3pwh2or5afsgl9/branch/master?svg=true)](https://ci.appveyor.com/project/sangshuduo/tdengine-2n8ge/branch/master)
[![Coverage Status](https://coveralls.io/repos/github/taosdata/TDengine/badge.svg?branch=develop)](https://coveralls.io/github/taosdata/TDengine?branch=develop)
[![Coverage Status](https://coveralls.io/repos/github/taosdata/TDengine/badge.svg?branch=3.0)](https://coveralls.io/github/taosdata/TDengine?branch=3.0)
[![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/4201/badge)](https://bestpractices.coreinfrastructure.org/projects/4201)
简体中文 | [English](README.md) | [TDengine 云服务](https://cloud.taosdata.com/?utm_medium=cn&utm_source=github) | 很多职位正在热招中,请看[这里](https://www.taosdata.com/cn/careers/)

View File

@ -12,7 +12,7 @@
[![Build Status](https://cloud.drone.io/api/badges/taosdata/TDengine/status.svg?ref=refs/heads/master)](https://cloud.drone.io/taosdata/TDengine)
[![Build status](https://ci.appveyor.com/api/projects/status/kf3pwh2or5afsgl9/branch/master?svg=true)](https://ci.appveyor.com/project/sangshuduo/tdengine-2n8ge/branch/master)
[![Coverage Status](https://coveralls.io/repos/github/taosdata/TDengine/badge.svg?branch=develop)](https://coveralls.io/github/taosdata/TDengine?branch=develop)
[![Coverage Status](https://coveralls.io/repos/github/taosdata/TDengine/badge.svg?branch=3.0)](https://coveralls.io/github/taosdata/TDengine?branch=3.0)
[![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/4201/badge)](https://bestpractices.coreinfrastructure.org/projects/4201)
<br />
[![Twitter Follow](https://img.shields.io/twitter/follow/tdenginedb?label=TDengine&style=social)](https://twitter.com/tdenginedb)

View File

@ -491,6 +491,8 @@ TO_CHAR(ts, format_str_literal)
**Description**: Convert a ts column to string as the format specified
**Version**: Since ver-3.2.2.0
**Return value type**: VARCHAR
**Applicable column types**: TIMESTAMP
@ -550,6 +552,8 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal)
**Description**: Convert a formated timestamp string to a timestamp
**Version**: Since ver-3.2.2.0
**Return value type**: TIMESTAMP
**Applicable column types**: VARCHAR

View File

@ -7,14 +7,14 @@ description: This document describes the usage of escape characters in TDengine.
| Escape Character | **Actual Meaning** |
| :--------------: | ------------------------ |
| `\'` | Single quote ' |
| `\"` | Double quote " |
| \n | Line Break |
| \r | Carriage Return |
| \t | tab |
| `\\` | Back Slash \ |
| `\%` | % see below for details |
| `\_` | \_ see below for details |
| `\'` | Single quote `'` |
| `\"` | Double quote `"` |
| `\n` | Line Break |
| `\r` | Carriage Return |
| `\t` | tab |
| `\\` | Back Slash `\ ` |
| `\%` | `%` see below for details |
| `\_` | `_` see below for details |
## Restrictions
@ -22,5 +22,5 @@ description: This document describes the usage of escape characters in TDengine.
- Identifier without ``: Error will be returned because identifier must be constituted of digits, ASCII characters or underscore and can't be started with digits
- Identifier quoted with ``: Original content is kept, no escaping
2. If there are escape characters in values
- The escape characters will be escaped as the above table. If the escape character doesn't match any supported one, the escape character "\" will be ignored.
- "%" and "\_" are used as wildcards in `like`. `\%` and `\_` should be used to represent literal "%" and "\_" in `like`,. If `\%` and `\_` are used out of `like` context, the evaluation result is "`\%`"and "`\_`", instead of "%" and "\_".
- The escape characters will be escaped as the above table. If the escape character doesn't match any supported one, the escape character `\ ` will be ignored(`\x` remaining).
- `%` and `_` are used as wildcards in `like`. `\%` and `\_` should be used to represent literal `%` and `_` in `like`. If `\%` and `\_` are used out of `like` context, the evaluation result is `\%` and `\_`, instead of `%` and `_`.

View File

@ -491,6 +491,8 @@ TO_CHAR(ts, format_str_literal)
**功能说明**: 将timestamp类型按照指定格式转换为字符串
**版本**: ver-3.2.2.0
**返回结果数据类型**: VARCHAR
**应用字段**: TIMESTAMP
@ -550,6 +552,8 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal)
**功能说明**: 将字符串按照指定格式转化为时间戳.
**版本**: ver-3.2.2.0
**返回结果数据类型**: TIMESTAMP
**应用字段**: VARCHAR

View File

@ -8,16 +8,15 @@ description: TDengine 中使用转义字符的详细规则
| 字符序列 | **代表的字符** |
| :------: | -------------- |
| `\'` | 单引号' |
| `\"` | 双引号" |
| \n | 换行符 |
| \r | 回车符 |
| \t | tab 符 |
| `\\` | 斜杠\ |
| `\%` | % 规则见下 |
| `\_` | \_ 规则见下 |
| `\'` | 单引号`'` |
| `\"` | 双引号`"` |
| `\n` | 换行符 |
| `\r` | 回车符 |
| `\t` | tab 符 |
| `\\` | 斜杠 `\ ` |
| `\%` | `%` 规则见下 |
| `\_` | `_` 规则见下 |
:::
## 转义字符使用规则
@ -25,5 +24,5 @@ description: TDengine 中使用转义字符的详细规则
1. 普通标识符: 直接提示错误的标识符,因为标识符规定必须是数字、字母和下划线,并且不能以数字开头。
2. 反引号``标识符: 保持原样,不转义
2. 数据里有转义字符
1. 遇到上面定义的转义字符会转义(%和\_见下面说明),如果没有匹配的转义字符会忽略掉转义符\。
2. 对于%和\_因为在 like 里这两个字符是通配符,所以在模式匹配 like 里用`\%`%和`\_`表示字符里本身的%和\_如果在 like 模式匹配上下文之外使用`\%`或`\_`,则它们的计算结果为字符串`\%`和`\_`,而不是%和\_
1. 遇到上面定义的转义字符会转义(`%`和`_`见下面说明),如果没有匹配的转义字符会忽略掉转义符`\ ``\x`保持原样)
2. 对于`%`和`_`,因为在`like`里这两个字符是通配符,所以在模式匹配`like`里用`\%`和`\_`表示字符里本身的`%`和`_`,如果在`like`模式匹配上下文之外使用`\%`或`\_`,则它们的计算结果为字符串`\%`和`\_`,而不是`%`和`_`

View File

@ -206,6 +206,7 @@ typedef struct SDataBlockInfo {
int16_t hasVarCol;
int16_t dataLoad; // denote if the data is loaded or not
uint8_t scanFlag;
bool blankFill;
// TODO: optimize and remove following
int64_t version; // used for stream, and need serialization

View File

@ -48,6 +48,7 @@ typedef enum {
TSDB_GRANT_CPU_CORES,
TSDB_GRANT_STABLE,
TSDB_GRANT_TABLE,
TSDB_GRANT_SUBSCRIBE,
} EGrantType;
int32_t grantCheck(EGrantType grant);

View File

@ -3331,7 +3331,7 @@ typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq;
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
typedef struct {
int8_t reserved;
@ -3754,7 +3754,12 @@ typedef struct {
} SMqHbReq;
typedef struct {
int8_t reserved;
char topic[TSDB_TOPIC_FNAME_LEN];
int8_t noPrivilege;
} STopicPrivilege;
typedef struct {
SArray* topicPrivileges; // SArray<STopicPrivilege>
} SMqHbRsp;
typedef struct {
@ -3773,18 +3778,6 @@ typedef struct {
SVCreateTbReq cTbReq;
} SVSubmitBlk;
typedef struct {
int32_t flags;
int32_t nBlocks;
union {
SArray* pArray;
SVSubmitBlk* pBlocks;
};
} SVSubmitReq;
int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
typedef struct {
SMsgHead header;
uint64_t sId;
@ -3893,6 +3886,10 @@ int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeatroySMqHbReq(SMqHbReq* pReq);
int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp);
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);

View File

@ -114,6 +114,7 @@ typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data
int32_t startRowIndex; // handle started row index
int64_t numOfRows; // the number of rows needs to be handled
bool blankFill; // fill blank data to block for empty table
int32_t numOfInputCols; // PTS is not included
bool colDataSMAIsSet; // if agg is set or not
SColumnInfoData *pPTS; // primary timestamp column

View File

@ -324,12 +324,13 @@ typedef struct SStreamStatus {
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int32_t schedIdleTime; // idle time before invoke again
int64_t lastExecTs; // last exec time stamp
int8_t statusBackup;
bool appendTranstateBlock; // has append the transfer state data block already
int32_t timerActive; // timer is active
int32_t schedIdleTime; // idle time before invoke again
int32_t timerActive; // timer is active
int64_t lastExecTs; // last exec time stamp
int32_t inScanHistorySentinel;
bool appendTranstateBlock; // has append the transfer state data block already
bool supplementaryWalscan; // complete the supplementary wal scan or not
} SStreamStatus;
typedef struct SDataRange {

View File

@ -406,10 +406,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
if (pStmt->bInfo.inExecCache) {
if (ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1)) {
tscError("stmtGetFromCache error");
return TSDB_CODE_TSC_STMT_CACHE_ERROR;
}
pStmt->bInfo.needParse = false;
tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;

View File

@ -155,6 +155,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqClientVg>
SSchemaWrapper schema;
int8_t noPrivilege;
} SMqClientTopic;
typedef struct {
@ -739,6 +740,29 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
SMqHbRsp rsp = {0};
tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
taosWLockLatch(&tmq->lock);
for(int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++){
STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
if(privilege->noPrivilege == 1){
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) {
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if(strcmp(pTopicCur->topicName, privilege->topic) == 0){
tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
pTopicCur->noPrivilege = 1;
}
}
}
}
taosWUnLockLatch(&tmq->lock);
}
tDeatroySMqHbRsp(&rsp);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
@ -809,7 +833,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = NULL;
sendInfo->paramFreeFp = taosMemoryFree;
sendInfo->param = taosMemoryMalloc(sizeof(int64_t));
*(int64_t *)sendInfo->param = refId;
sendInfo->fp = tmqHbCb;
sendInfo->msgType = TDMT_MND_TMQ_HB;
@ -1705,7 +1731,10 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
if(pTopic->noPrivilege){
tscDebug("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName);
continue;
}
for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms

View File

@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4;
float tsRatioOfVnodeStreamThreads = 1.0;
float tsRatioOfVnodeStreamThreads = 0.5F;
int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4;
@ -622,7 +622,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
0)
return -1;
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 10, CFG_SCOPE_SERVER,
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER,
CFG_DYN_NONE) != 0)
return -1;

View File

@ -18,6 +18,6 @@
#ifndef _GRANT
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;}
#endif

View File

@ -6139,6 +6139,55 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
return 0;
}
int32_t tDeatroySMqHbRsp(SMqHbRsp *pRsp) {
taosArrayDestroy(pRsp->topicPrivileges);
return 0;
}
int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t sz = taosArrayGetSize(pRsp->topicPrivileges);
if (tEncodeI32(&encoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; ++i) {
STopicPrivilege *privilege = (STopicPrivilege *)taosArrayGet(pRsp->topicPrivileges, i);
if (tEncodeCStr(&encoder, privilege->topic) < 0) return -1;
if (tEncodeI8(&encoder, privilege->noPrivilege) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t sz = 0;
if (tDecodeI32(&decoder, &sz) < 0) return -1;
if (sz > 0) {
pRsp->topicPrivileges = taosArrayInit(sz, sizeof(STopicPrivilege));
if (NULL == pRsp->topicPrivileges) return -1;
for (int32_t i = 0; i < sz; ++i) {
STopicPrivilege *data = taosArrayReserve(pRsp->topicPrivileges, 1);
if (tDecodeCStrTo(&decoder, data->topic) < 0) return -1;
if (tDecodeI8(&decoder, &data->noPrivilege) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tDeatroySMqHbReq(SMqHbReq *pReq) {
for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) {
TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i);
@ -6194,7 +6243,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if (NULL == pReq->topics) return -1;
for (int32_t i = 0; i < sz; ++i) {
TopicOffsetRows *data = taosArrayReserve(pReq->topics, 1);
tDecodeCStrTo(&decoder, data->topicName);
if (tDecodeCStrTo(&decoder, data->topicName) < 0) return -1;
int32_t szVgs = 0;
if (tDecodeI32(&decoder, &szVgs) < 0) return -1;
if (szVgs > 0) {
@ -7753,36 +7802,6 @@ static int32_t tDecodeSVSubmitBlk(SDecoder *pCoder, SVSubmitBlk *pBlock, int32_t
return 0;
}
int32_t tEncodeSVSubmitReq(SEncoder *pCoder, const SVSubmitReq *pReq) {
int32_t nBlocks = taosArrayGetSize(pReq->pArray);
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1;
if (tEncodeI32v(pCoder, nBlocks) < 0) return -1;
for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
if (tEncodeSVSubmitBlk(pCoder, (SVSubmitBlk *)taosArrayGet(pReq->pArray, iBlock), pReq->flags) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->nBlocks) < 0) return -1;
pReq->pBlocks = tDecoderMalloc(pCoder, sizeof(SVSubmitBlk) * pReq->nBlocks);
if (pReq->pBlocks == NULL) return -1;
for (int32_t iBlock = 0; iBlock < pReq->nBlocks; iBlock++) {
if (tDecodeSVSubmitBlk(pCoder, pReq->pBlocks + iBlock, pReq->flags) < 0) return -1;
}
tEndDecode(pCoder);
return 0;
}
static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) {
if (tStartEncode(pEncoder) < 0) return -1;

View File

@ -30,7 +30,6 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname);
int32_t mndCheckViewPrivilege(SMnode *pMnode, const char *user, EOperType operType, const char *pViewFName);
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic);
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName);
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);

View File

@ -26,9 +26,17 @@ extern "C" {
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_VER_NUMBER 4
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
typedef struct SStreamTransInfo {
int64_t startTime;
int64_t streamUid;
int64_t streamId;
const char *name;
int32_t transId;
} SStreamTransInfo;
@ -41,7 +49,7 @@ typedef struct SVgroupChangeInfo {
// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard
// to avoid too many checkpoints for a taskk in the waiting list
typedef struct SCheckpointCandEntry {
char * pName;
char *pName;
int64_t streamId;
int64_t checkpointTs;
int64_t checkpointId;
@ -62,6 +70,9 @@ typedef struct SStreamExecInfo {
SHashObj *pTransferStateStreams;
} SStreamExecInfo;
extern SStreamExecInfo execInfo;
typedef struct SStreamTaskIter SStreamTaskIter;
typedef struct SNodeEntry {
int32_t nodeId;
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
@ -69,21 +80,11 @@ typedef struct SNodeEntry {
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SFailedCheckpointInfo {
int64_t streamUid;
int64_t checkpointId;
int32_t transId;
} SFailedCheckpointInfo;
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
extern SStreamExecInfo execInfo;
typedef struct SOrphanTask {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SOrphanTask;
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
@ -91,35 +92,38 @@ SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
// for sma
// TODO refactor
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t initStreamNodeList(SMnode *pMnode);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
#ifdef __cplusplus
}

View File

@ -599,7 +599,8 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
pDetail->newNumberFileset, pDetail->newFinished);
if(pDetail->numberFileset < pDetail->newNumberFileset || pDetail->finished < pDetail->newFinished)
//these 2 number will jump back after dnode restart, so < is not used here
if(pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
needSave = true;
}

View File

@ -101,8 +101,9 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
goto FAILED;
}
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
code = -1;
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) {
code = TSDB_CODE_MND_NO_RIGHTS;
terrno = TSDB_CODE_MND_NO_RIGHTS;
goto FAILED;
}
@ -220,22 +221,53 @@ FAIL:
return -1;
}
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char* user){
rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
if(rsp->topicPrivileges == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
for(int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++){
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) { // terrno has been set by callee function
continue;
}
STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
strcpy(data->topic, topic);
if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIBE) < 0) {
data->noPrivilege = 1;
} else{
data->noPrivilege = 0;
}
mndReleaseTopic(pMnode, pTopic);
}
return 0;
}
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t code = 0;
SMnode *pMnode = pMsg->info.node;
SMqHbReq req = {0};
SMqHbRsp rsp = {0};
SMqConsumerObj *pConsumer = NULL;
if ((code = tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req)) < 0) {
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
int64_t consumerId = req.consumerId;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
mError("consumer:0x%" PRIx64 " not exist", consumerId);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
code = -1;
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
goto end;
}
code = checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user);
if(code != 0){
goto end;
}
@ -280,9 +312,22 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
mndReleaseSubscribe(pMnode, pSub);
}
mndReleaseConsumer(pMnode, pConsumer);
// encode rsp
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, &rsp);
void *buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tSerializeSMqHbRsp(buf, tlen, &rsp);
pMsg->info.rsp = buf;
pMsg->info.rspLen = tlen;
end:
tDeatroySMqHbRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer);
tDeatroySMqHbReq(&req);
return code;
}
@ -500,6 +545,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
char *msgStr = pMsg->pCont;
if(grantCheck(TSDB_GRANT_SUBSCRIBE) < 0){
terrno = TSDB_CODE_GRANT_EXPIRED;
return -1;
}
SCMSubscribeReq subscribe = {0};
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
@ -525,7 +575,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
// check topic existence
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe");
if (pTrans == NULL) {
goto _over;
}

View File

@ -30,9 +30,6 @@ int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType op
}
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) {
return 0;
}
int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) {

View File

@ -865,7 +865,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
} else {
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
@ -917,7 +917,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
if (pStream != NULL && pStream->smaId == pSma->uid) {
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
mndReleaseStream(pMnode, pStream);
goto _OVER;

View File

@ -476,22 +476,20 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
}
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
return -1;
}
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
destroyStreamTaskIter(pIter);
return -1;
}
}
destroyStreamTaskIter(pIter);
// persistent stream task for already stored ts data
if (pStream->conf.fillHistory) {
level = taosArrayGetSize(pStream->pHTasksList);
int32_t level = taosArrayGetSize(pStream->pHTasksList);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
@ -608,50 +606,6 @@ _OVER:
return -1;
}
static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
terrno = code;
return -1;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t lv = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < lv; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) {
return -1;
}
}
}
return 0;
}
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
@ -690,6 +644,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
int32_t sqlLen = 0;
terrno = TSDB_CODE_SUCCESS;
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
terrno = TSDB_CODE_GRANT_STREAM_LIMITED;
return -1;
}
SCMCreateStreamReq createStreamReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
@ -900,6 +859,32 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
return 0;
}
static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger) {
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
taosMemoryFree(buf);
return -1;
}
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(buf);
return -1;
}
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (code != 0) {
taosMemoryFree(buf);
}
return code;
}
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
int8_t mndTrigger, bool lock) {
int32_t code = -1;
@ -909,6 +894,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId);
@ -931,8 +917,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
pStream->currentTick = 1;
// 1. redo action: broadcast checkpoint source msg for all source vg
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
int32_t totalLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totalLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *p = taosArrayGetP(pLevel, 0);
@ -940,28 +926,9 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
code = doSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
SEpSet epset = {0};
bool hasEpset = false;
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(buf);
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (code != 0) {
taosMemoryFree(buf);
if (code != TSDB_CODE_SUCCESS) {
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
@ -1200,7 +1167,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -1264,7 +1231,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
return -1;
} else {
#if 0
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
@ -1544,21 +1511,19 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
}
// add row for each task
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
if (code == TSDB_CODE_SUCCESS) {
numOfRows++;
}
int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
if (code == TSDB_CODE_SUCCESS) {
numOfRows++;
}
}
// unlock
destroyStreamTaskIter(pIter);
taosRUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
}
@ -1664,21 +1629,26 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMResumeStreamReq pauseReq = {0};
if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
terrno = TSDB_CODE_GRANT_EXPIRED;
return -1;
}
SMResumeStreamReq resumeReq = {0};
if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
pStream = mndAcquireStream(pMnode, pauseReq.name);
pStream = mndAcquireStream(pMnode, resumeReq.name);
if (pStream == NULL) {
if (pauseReq.igNotExists) {
mInfo("stream:%s not exist, not resume stream", pauseReq.name);
if (resumeReq.igNotExists) {
mInfo("stream:%s not exist, not resume stream", resumeReq.name);
sdbRelease(pMnode->pSdb, pStream);
return 0;
} else {
mError("stream:%s not exist, failed to resume stream", pauseReq.name);
mError("stream:%s not exist, failed to resume stream", resumeReq.name);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
@ -1703,7 +1673,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
if (pTrans == NULL) {
mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
@ -1711,8 +1681,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
// set the resume action
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
@ -1898,22 +1868,19 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
}
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
SStreamTaskIter *pTaskIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pTaskIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pTaskIter);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
epsetAssign(&entry.epset, &pTask->info.epSet);
taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
}
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
epsetAssign(&entry.epset, &pTask->info.epSet);
taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
}
destroyStreamTaskIter(pTaskIter);
taosWUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
}
@ -2099,58 +2066,50 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
}
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {0};
streamTaskStatusInit(&entry, pTask);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {0};
streamTaskStatusInit(&entry, pTask);
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
}
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
}
}
destroyStreamTaskIter(pIter);
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
break;
}
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
break;
}
}
}
}
destroyStreamTaskIter(pIter);
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
@ -2181,7 +2140,6 @@ static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numO
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamTaskCheckpointReq req = {0};
SDecoder decoder = {0};

View File

@ -16,6 +16,12 @@
#include "mndStream.h"
#include "mndTrans.h"
typedef struct SFailedCheckpointInfo {
int64_t streamUid;
int64_t checkpointId;
int32_t transId;
} SFailedCheckpointInfo;
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
@ -59,61 +65,24 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
taosArrayPush(pList, pInfo);
}
static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
}
/*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
// todo extract method, with pause stream task
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosWUnLockLatch(&pStream->lock);
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
continue;
}
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return terrno;
}
}
int32_t code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream);
if (code != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
taosWUnLockLatch(&pStream->lock);
int32_t code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
@ -177,10 +146,90 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
return TSDB_CODE_SUCCESS;
}
static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) {
SOrphanTask* pTask = taosArrayGet(pList, 0);
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
if (conflict) {
return -1;
}
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, MND_STREAM_DROP_NAME, "drop stream");
if (pTrans == NULL) {
mError("failed to create trans to drop orphan tasks since %s", terrstr());
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
// drop all tasks
if (mndStreamSetDropActionFromList(pMnode, pTrans, pList) < 0) {
mError("failed to create trans to drop orphan tasks since %s", terrstr());
mndTransDrop(pTrans);
return -1;
}
// drop stream
if (mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED) < 0) {
mndTransDrop(pTrans);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return 0;
}
int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void* pIter = NULL;
while(1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if(pStream->status != STREAM_STATUS__PAUSE){
SMPauseStreamReq reqPause = {0};
strcpy(reqPause.name, pStream->name);
reqPause.igNotExists = 1;
int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
void * pHead = rpcMallocCont(contLen);
tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_PAUSE_STREAM,
.pCont = pHead,
.contLen = contLen,
.info = *info,
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mInfo("receive pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
}
sdbRelease(pSdb, pStream);
}
return 0;
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
SArray *pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
if(suspendAllStreams(pMnode, &pReq->info) < 0){
return -1;
}
}
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
@ -198,8 +247,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
// extract stream task list
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
if (numOfExisted == 0) {
if (taosHashGetSize(execInfo.pTaskMap) == 0) {
doExtractTasksFromStream(pMnode);
}
@ -218,6 +266,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
taosArrayPush(pOrphanTasks, &oTask);
continue;
}
@ -240,15 +291,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
streamTaskStatusCopy(pTaskEntry, p);
if (p->checkpointId != 0) {
if (p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
if ((p->checkpointId != 0) && p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
SFailedCheckpointInfo info = {
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
addIntoCheckpointList(pList, &info);
}
SFailedCheckpointInfo info = {
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
addIntoCheckpointList(pFailedTasks, &info);
}
}
@ -266,15 +315,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pList) > 0) {
if (taosArrayGetSize(pFailedTasks) > 0) {
bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i);
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId);
@ -285,9 +334,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
// handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors.
if (taosArrayGetSize(pOrphanTasks) > 0) {
mndDropOrphanTasks(pMnode, pOrphanTasks);
}
taosThreadMutexUnlock(&execInfo.lock);
streamMetaClearHbMsg(&req);
taosArrayDestroy(pList);
taosArrayDestroy(pFailedTasks);
taosArrayDestroy(pOrphanTasks);
return TSDB_CODE_SUCCESS;
}

View File

@ -23,10 +23,10 @@ typedef struct SKeyInfo {
static int32_t clearFinishedTrans(SMnode* pMnode);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid) {
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamId) {
SStreamTransInfo info = {
.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamUid = streamUid};
taosHashPut(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid), &info, sizeof(SStreamTransInfo));
.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
return 0;
}
@ -65,7 +65,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
return 0;
}
bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* pTransName, bool lock) {
bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* pTransName, bool lock) {
if (lock) {
taosThreadMutexLock(&execInfo.lock);
}
@ -80,7 +80,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
clearFinishedTrans(pMnode);
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid));
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
if (pEntry != NULL) {
SStreamTransInfo tInfo = *pEntry;
@ -90,7 +90,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
return true;
@ -99,13 +99,13 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
}
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
return true;
}
} else {
mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamUid);
mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamId);
}
if (lock) {
@ -301,86 +301,4 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("complete clear checkpoints in Dbs");
}
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t transId) {
pMsg->streamId = pId->streamId;
pMsg->taskId = pId->taskId;
pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}
// todo extract method: traverse stream tasks
// build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
void *pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
}
taosWUnLockLatch(&pStream->lock);
return 0;
}

View File

@ -18,6 +18,66 @@
#include "tmisce.h"
#include "mndVgroup.h"
struct SStreamTaskIter {
SStreamObj *pStream;
int32_t level;
int32_t ordinalIndex;
int32_t totalLevel;
SStreamTask *pTask;
};
SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) {
SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
if (pIter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pIter->level = -1;
pIter->ordinalIndex = 0;
pIter->pStream = pStream;
pIter->totalLevel = taosArrayGetSize(pStream->tasks);
pIter->pTask = NULL;
return pIter;
}
bool streamTaskIterNextTask(SStreamTaskIter* pIter) {
if (pIter->level >= pIter->totalLevel) {
pIter->pTask = NULL;
return false;
}
if (pIter->level == -1) {
pIter->level += 1;
}
while(pIter->level < pIter->totalLevel) {
SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
pIter->level += 1;
pIter->ordinalIndex = 0;
pIter->pTask = NULL;
continue;
}
pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
pIter->ordinalIndex += 1;
return true;
}
pIter->pTask = NULL;
return false;
}
SStreamTask* streamTaskIterGetCurrent(SStreamTaskIter* pIter) {
return pIter->pTask;
}
void destroyStreamTaskIter(SStreamTaskIter* pIter) {
taosMemoryFree(pIter);
}
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
@ -175,18 +235,16 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
}
SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (pTask->id.taskId == pId->taskId) {
return pTask;
}
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (pTask->id.taskId == pId->taskId) {
destroyStreamTaskIter(pIter);
return pTask;
}
}
destroyStreamTaskIter(pIter);
return NULL;
}
@ -201,21 +259,20 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
}
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
return -1;
}
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
destroyStreamTaskIter(pIter);
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
}
destroyStreamTaskIter(pIter);
return 0;
}
@ -251,24 +308,238 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
}
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SArray *tasks = pStream->tasks;
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
destroyStreamTaskIter(pIter);
return -1;
}
if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
}
destroyStreamTaskIter(pIter);
return 0;
}
}
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
terrno = code;
return -1;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while(streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (doSetDropAction(pMnode, pTrans, pTask) < 0) {
destroyStreamTaskIter(pIter);
return -1;
}
}
destroyStreamTaskIter(pIter);
return 0;
}
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
pReq->streamId = pTask->streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction
terrno = code;
taosMemoryFree(pReq);
return -1;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SOrphanTask* pTask = taosArrayGet(pList, i);
mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
doSetDropActionFromId(pMnode, pTrans, pTask);
}
return 0;
}
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t transId) {
pMsg->streamId = pId->streamId;
pMsg->taskId = pId->taskId;
pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}
static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
void *pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
}
return code;
}
// build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}
return code;
}
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
taosWLockLatch(&pStream->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t code = doSetResetAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}

View File

@ -599,6 +599,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->originRpcType = pReq->msgType;
}
mInfo("trans:%d, create transaction:%s, origin:%s", pTrans->id, pTrans->opername, opername);
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans;
}
@ -845,6 +847,8 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if(pTrans == NULL) return -1;
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
return -1;
}

View File

@ -1925,6 +1925,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
return -1;
}
taosHashPut(pNewUser->topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN);
mndReleaseTopic(pMnode, pTopic);
}
if (ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(pAlterReq->alterType, pAlterReq->privileges)) {
@ -1935,6 +1936,7 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode
return -1;
}
taosHashRemove(pNewUser->topics, pAlterReq->objname, len);
mndReleaseTopic(pMnode, pTopic);
}
return TSDB_CODE_SUCCESS;

View File

@ -835,8 +835,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}

View File

@ -1080,7 +1080,7 @@ bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);

View File

@ -311,6 +311,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0;
pInput->blankFill = pBlock->info.blankFill;
// NOTE: the last parameter is the primary timestamp column
// todo: refactor this
@ -325,6 +326,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0;
pInput->blankFill = pBlock->info.blankFill;
code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -663,6 +663,8 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
pInfo->pResBlock->info.blankFill = false;
if (!pInfo->needCountEmptyTable) {
pInfo->countState = TABLE_COUNT_STATE_END;
} else {
@ -687,6 +689,7 @@ static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBas
pBlock->info.rows = 1;
pBlock->info.id.uid = tbInfo->uid;
pBlock->info.id.groupId = tbInfo->groupId;
pBlock->info.blankFill = true;
// only one row: set all col data to null & hasNull
int32_t col_num = blockDataGetNumOfCols(pBlock);
@ -696,7 +699,7 @@ static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBas
}
// set tag/tbname
doSetTagColumnData(pBase, pBlock, pTaskInfo, pBlock->info.rows);
doSetTagColumnData(pBase, pBlock, pTaskInfo, 1);
return pBlock;
}

View File

@ -1334,7 +1334,6 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
int32_t tsortClose(SSortHandle* pHandle) {
atomic_val_compare_exchange_8(&pHandle->closed, 0, 1);
taosMsleep(10);
return TSDB_CODE_SUCCESS;
}

View File

@ -499,6 +499,9 @@ static int64_t getNumOfElems(SqlFunctionCtx* pCtx) {
*/
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
if(1 == pInput->numOfRows && pInput->blankFill) {
return 0;
}
if (pInput->colDataSMAIsSet && pInput->totalRows == pInput->numOfRows) {
numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
} else {
@ -6022,7 +6025,7 @@ int32_t groupKeyFunction(SqlFunctionCtx* pCtx) {
goto _group_key_over;
}
if (colDataIsNull_s(pInputCol, startIndex)) {
if (pInputCol->pData == NULL || colDataIsNull_s(pInputCol, startIndex)) {
pInfo->isNull = true;
pInfo->hasResult = true;
goto _group_key_over;

View File

@ -440,14 +440,14 @@ static int32_t parseVarbinary(SToken* pToken, uint8_t **pData, uint32_t *nData,
return TSDB_CODE_PAR_INVALID_VARBINARY;
}
if(isHex(pToken->z, pToken->n)){
if(!isValidateHex(pToken->z, pToken->n)){
if(isHex(pToken->z + 1, pToken->n - 2)){
if(!isValidateHex(pToken->z + 1, pToken->n - 2)){
return TSDB_CODE_PAR_INVALID_VARBINARY;
}
void* data = NULL;
uint32_t size = 0;
if(taosHex2Ascii(pToken->z, pToken->n, &data, &size) < 0){
if(taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0){
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -458,11 +458,13 @@ static int32_t parseVarbinary(SToken* pToken, uint8_t **pData, uint32_t *nData,
*pData = data;
*nData = size;
}else{
if (pToken->n + VARSTR_HEADER_SIZE > bytes) {
*pData = taosMemoryCalloc(1, pToken->n);
int32_t len = trimString(pToken->z, pToken->n, *pData, pToken->n);
*nData = len;
if (*nData + VARSTR_HEADER_SIZE > bytes) {
return TSDB_CODE_PAR_VALUE_TOO_LONG;
}
*pData = taosStrdup(pToken->z);
*nData = pToken->n;
}
return TSDB_CODE_SUCCESS;
}
@ -753,7 +755,7 @@ static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* p
return TSDB_CODE_SUCCESS;
}
static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) {
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
@ -763,7 +765,7 @@ static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMs
}
// Remove quotation marks
if (TK_NK_STRING == pToken->type) {
if (TK_NK_STRING == pToken->type && type != TSDB_DATA_TYPE_VARBINARY) {
if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
}
@ -935,7 +937,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt
SSchema* pTagSchema = &pSchema[pCxt->tags.pColIndex[i]];
isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON;
code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg);
code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
if (TK_NK_VARIABLE == token.type) {
code = buildSyntaxErrMsg(&pCxt->msg, "not expected tags values ", token.z);
}
@ -1631,7 +1633,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
int16_t timePrec, SColVal* pVal) {
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pSchema->type);
if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
return buildSyntaxErrMsg(&pCxt->msg, "primary timestamp should not be null", pToken->z);
@ -1691,7 +1693,7 @@ typedef union SRowsDataContext{
static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, bool* pFoundCtbName) {
*pFoundCtbName = false;
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, TSDB_DATA_TYPE_BINARY);
if (TK_NK_VARIABLE == pToken->type) {
code = buildInvalidOperationMsg(&pCxt->msg, "not expected tbname");
}
@ -1731,7 +1733,7 @@ static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModif
for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) {
SToken* pTagToken = (SToken*)(tagTokens + i);
SSchema* pTagSchema = tagSchemas[i];
code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg);
code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
if (TK_NK_VARIABLE == pTagToken->type) {
code = buildInvalidOperationMsg(&pCxt->msg, "not expected tag");
}
@ -1790,7 +1792,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
tagSchemas[(*pNumOfTagTokens)] = (SSchema*)pTagSchema;
++(*pNumOfTagTokens);
} else {
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pTagSchema->type);
if (TK_NK_VARIABLE == pToken->type) {
code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
}

View File

@ -390,6 +390,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
stDebug("s-task:%s after exceed the threshold:%" PRId64 " and then update the window filter",
pStreamTask->id.idStr, pStreamTask->dataRange.range.maxVer);
} else {
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
}
@ -400,7 +402,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 3. send msg to mnode to launch a checkpoint to keep the state for current stream
streamTaskSendCheckpointReq(pStreamTask);
// streamTaskResume(pStreamTask);
// 4. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
@ -777,6 +778,8 @@ int32_t streamResumeTask(SStreamTask* pTask) {
while (1) {
/*int32_t code = */ doStreamExecTask(pTask);
// check if continue
taosThreadMutexLock(&pTask->lock);
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);

View File

@ -1312,28 +1312,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
}
void streamMetaRLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-rlock", pMeta->vgId);
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosThreadRwlockRdlock(&pMeta->lock);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-runlock", pMeta->vgId);
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
} else {
stDebug("vgId:%d meta-runlock completed", pMeta->vgId);
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
}
}
void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId);
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosThreadRwlockWrlock(&pMeta->lock);
stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wunlock", pMeta->vgId);
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosThreadRwlockUnlock(&pMeta->lock);
}

View File

@ -385,7 +385,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
int64_t initTs = pTask->execInfo.init;
@ -989,4 +988,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
}
}

View File

@ -670,7 +670,6 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur) {
return;
}
qDebug("streamStateFreeCur");
streamStateResetCur(pCur);
taosMemoryFree(pCur);
}

View File

@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
@ -1382,7 +1382,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
tFreeSUpdateIpWhiteReq(req);
taosMemoryFree(req);
} else {
tInfo("ip-white-list disable on trans");
tDebug("ip-white-list disable on trans");
thrd->enableIpWhiteList = 0;
}
taosMemoryFree(msg);

View File

@ -159,6 +159,7 @@ void taosFreeQitem(void *pItem) {
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
int32_t code = 0;
STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
pNode->timestamp = taosGetTimestampUs();
pNode->next = NULL;
taosThreadMutexLock(&queue->mutex);
@ -464,6 +465,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
qinfo->ahandle = queue->ahandle;
qinfo->fp = queue->itemsFp;
qinfo->queue = queue;
qinfo->timestamp = queue->head->timestamp;
queue->head = NULL;
queue->tail = NULL;
@ -489,8 +491,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
int64_t taosQallUnAccessedItemSize(STaosQall *qall) {return qall->unAccessedNumOfItems;}
int64_t taosQallUnAccessedMemSize(STaosQall *qall) {return qall->unAccessMemOfItems;}
int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }

View File

@ -15,10 +15,12 @@
#define _DEFAULT_SOURCE
#include "tworker.h"
#include "tgeosctx.h"
#include "taoserror.h"
#include "tgeosctx.h"
#include "tlog.h"
#define QUEUE_THRESHOLD 1000 * 1000
typedef void *(*ThreadFp)(void *param);
int32_t tQWorkerInit(SQWorkerPool *pool) {
@ -84,6 +86,13 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
break;
}
if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
}
}
if (qinfo.fp != NULL) {
qinfo.workerId = worker->id;
qinfo.threadNum = pool->num;
@ -198,6 +207,13 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
break;
}
if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
}
}
if (qinfo.fp != NULL) {
qinfo.workerId = worker->id;
qinfo.threadNum = taosArrayGetSize(pool->workers);
@ -221,7 +237,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
int32_t queueNum = taosGetQueueNumber(pool->qset);
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
if (dstWorkerNum < 1) dstWorkerNum = 1;
if (dstWorkerNum < 2) dstWorkerNum = 2;
// spawn a thread to process queue
while (curWorkerNum < dstWorkerNum) {
@ -338,6 +354,13 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
break;
}
if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
}
}
if (qinfo.fp != NULL) {
qinfo.workerId = worker->id;
qinfo.threadNum = pool->num;

View File

@ -230,6 +230,7 @@ fi
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -i True
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform.py -N 2 -n 1
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 2 -n 1
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 6 -n 3
#,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-db.py -N 6 -n 3
@ -292,6 +293,7 @@ fi
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_double.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py

View File

@ -0,0 +1,184 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import time
import taos
from taos.tmq import *
from util.cases import *
from util.common import *
from util.log import *
from util.sql import *
from util.sqlset import *
class TDTestCase:
clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 143, 'clientCfg':clientCfgDict}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
self.stbname = 'stb'
self.user_name = 'test'
self.binary_length = 20 # the length of binary for column_dict
self.nchar_length = 20 # the length of nchar for column_dict
self.dbnames = ['db1']
self.column_dict = {
'ts': 'timestamp',
'col1': 'float',
'col2': 'int',
'col3': 'float',
}
self.tag_dict = {
't1': 'int',
't2': f'binary({self.binary_length})'
}
self.tag_list = [
f'1, "Beijing"',
f'2, "Shanghai"',
f'3, "Guangzhou"',
f'4, "Shenzhen"'
]
self.values_list = [
f'now, 9.1, 200, 0.3'
]
self.tbnum = 4
self.topic_name = 'topic1'
def prepare_data(self):
for db in self.dbnames:
tdSql.execute(f"create database {db} vgroups 1")
tdSql.execute(f"use {db}")
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
for j in self.values_list:
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
def checkUserPrivileges(self, rowCnt):
tdSql.query("show user privileges")
tdSql.checkRows(rowCnt)
def streamTest(self):
tdSql.execute("create stream s1 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
time.sleep(2)
tdSql.query("select * from so1")
tdSql.checkRows(4)
tdSql.execute("insert into stb_0(ts,col2) values(now, 332)")
time.sleep(2)
tdSql.query("select * from so1")
tdSql.checkRows(5)
time.sleep(2)
tdSql.query("select * from information_schema.ins_stream_tasks")
tdSql.checkData(0, 5, 'ready')
print(time.time())
while 1:
t = time.time()
if t > 1706254434 :
break
else:
print("time:%d" %(t))
time.sleep(1)
tdSql.error("create stream s11 trigger at_once fill_history 1 into so1 as select ts,abs(col2) from stb partition by tbname")
time.sleep(10)
tdSql.query("select * from information_schema.ins_stream_tasks")
tdSql.checkData(0, 5, 'paused')
tdSql.execute("insert into stb_0(ts,col2) values(now, 3232)")
tdSql.query("select * from so1")
tdSql.checkRows(5)
tdSql.error("resume stream s1")
def consumeTest(self):
consumer_dict = {
"group.id": "g1",
"td.connect.user": self.user_name,
"td.connect.pass": "test",
"auto.offset.reset": "earliest"
}
consumer = Consumer(consumer_dict)
tdLog.debug("test subscribe topic created by other user")
exceptOccured = False
try:
consumer.subscribe([self.topic_name])
except TmqError:
exceptOccured = True
if not exceptOccured:
tdLog.exit(f"has no privilege, should except")
self.checkUserPrivileges(1)
tdLog.debug("test subscribe topic privilege granted by other user")
tdSql.execute(f'grant subscribe on {self.topic_name} to {self.user_name}')
self.checkUserPrivileges(2)
exceptOccured = False
try:
consumer.subscribe([self.topic_name])
except TmqError:
exceptOccured = True
if exceptOccured:
tdLog.exit(f"has privilege, should not except")
cnt = 0
try:
while True:
res = consumer.poll(1)
cnt += 1
if cnt == 1:
if not res:
tdLog.exit(f"grant privilege, should get res")
elif cnt == 2:
if res:
tdLog.exit(f"revoke privilege, should get NULL")
else:
break
tdLog.debug("test subscribe topic privilege revoked by other user")
tdSql.execute(f'revoke subscribe on {self.topic_name} from {self.user_name}')
self.checkUserPrivileges(1)
time.sleep(5)
finally:
consumer.close()
def create_user(self):
tdSql.execute(f'create topic {self.topic_name} as database {self.dbnames[0]}')
tdSql.execute(f'create user {self.user_name} pass "test"')
def run(self):
self.prepare_data()
self.create_user()
self.consumeTest()
# self.streamTest()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -12,7 +12,8 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
# updatecfgDict = {'debugFlag': 135}
clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict}
def __init__(self):
self.vgroups = 2

View File

@ -11,7 +11,9 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135}
clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")

View File

@ -186,7 +186,7 @@ class TDTestCase:
tmqCom.getStartCommitNotifyFromTmqsim()
#restart dnode & remove wal
# self.restartAndRemoveWal()
self.restartAndRemoveWal()
# redistribute vgroup
self.redistributeVgroups();
@ -235,7 +235,7 @@ class TDTestCase:
tdSql.execute(sqlString)
tdSql.query("flush database %s"%(paraDict['dbName']))
#restart dnode & remove wal
# self.restartAndRemoveWal()
self.restartAndRemoveWal()
# redistribute vgroup
self.redistributeVgroups();
@ -313,7 +313,7 @@ class TDTestCase:
time.sleep(5)
#restart dnode & remove wal
# self.restartAndRemoveWal()
self.restartAndRemoveWal()
# redistribute vgroup
self.redistributeVgroups()

View File

@ -85,7 +85,6 @@ void varbinary_sql_test() {
// test insert
pRes = taos_query(taos, "insert into tb2 using stb tags (2, 'tb2_bin1', 093) values (now + 2s, 'nchar1', 892, 0.3)");
printf("error:%s", taos_errstr(pRes));
ASSERT(taos_errno(pRes) != 0);
pRes = taos_query(taos, "insert into tb3 using stb tags (3, 'tb3_bin1', 0x7f829) values (now + 3s, 'nchar1', 0x7f829, 0.3)");
@ -322,6 +321,60 @@ void varbinary_sql_test() {
printf("%s result %s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
// test insert string value '\x'
pRes = taos_query(taos, "insert into tb5 using stb tags (5, 'tb5_bin1', '\\\\xg') values (now + 4s, 'nchar1', '\\\\xg', 0.3)");
taos_free_result(pRes);
pRes = taos_query(taos, "select c2,t3 from stb where t3 = '\\x5C7867'");
while ((row = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
void* data = NULL;
uint32_t size = 0;
if(taosAscii2Hex(row[0], length[0], &data, &size) < 0){
ASSERT(0);
}
ASSERT(memcmp(data, "\\x5C7867", size) == 0);
taosMemoryFree(data);
if(taosAscii2Hex(row[1], length[1], &data, &size) < 0){
ASSERT(0);
}
ASSERT(memcmp(data, "\\x5C7867", size) == 0);
taosMemoryFree(data);
}
taos_free_result(pRes);
// test insert
char tmp [65517*2+3] = {0};
tmp[0] = '\\';
tmp[1] = 'x';
memset(tmp + 2, 48, 65517*2);
char sql[65517*2+3 + 256] = {0};
pRes = taos_query(taos, "create stable stb1 (ts timestamp, c2 varbinary(65517)) tags (t1 int, t2 binary(8), t3 varbinary(8))");
taos_free_result(pRes);
sprintf(sql, "insert into tb6 using stb1 tags (6, 'tb6_bin1', '\\\\xg') values (now + 4s, '%s')", tmp);
pRes = taos_query(taos, sql);
taos_free_result(pRes);
pRes = taos_query(taos, "select c2 from tb6");
while ((row = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
void* data = NULL;
uint32_t size = 0;
if(taosAscii2Hex(row[0], length[0], &data, &size) < 0){
ASSERT(0);
}
ASSERT(memcmp(data, tmp, size) == 0);
taosMemoryFree(data);
}
taos_free_result(pRes);
taos_close(taos);
}