fix:error in privilege

This commit is contained in:
wangmm0220 2023-08-25 17:56:17 +08:00
commit 047707fa17
34 changed files with 399 additions and 104 deletions

View File

@ -178,7 +178,7 @@ The following list shows all reserved keywords:
- MATCH
- MAX_DELAY
- MAX_SPEED
- BWLIMIT
- MAXROWS
- MERGE
- META

View File

@ -30,6 +30,10 @@ The source code of `TDengine.Connector` is hosted on [GitHub](https://github.com
The supported platforms are the same as those supported by the TDengine client driver.
:::note
Please note TDengine does not support 32bit Windows any more.
:::
## Version support
Please refer to [version support list](/reference/connector#version-support)

View File

@ -102,6 +102,8 @@ Usage: taosdump [OPTION...] dbname [tbname ...]
-L, --loose-mode Use loose mode if the table name and column name
use letter and number only. Default is NOT.
-n, --no-escape No escape char '`'. Default is using it.
-Q, --dot-replace Repalce dot character with underline character in
the table name.
-T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is
8.
-C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service

View File

@ -29,6 +29,10 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx"
支持的平台和 TDengine 客户端驱动支持的平台一致。
:::note
注意 TDengine 不再支持 32 位 Windows 平台。
:::
## 版本支持
请参考[版本支持列表](../#版本支持)

View File

@ -178,7 +178,7 @@ description: TDengine 保留关键字的详细列表
- MATCH
- MAX_DELAY
- MAX_SPEED
- BWLIMIT
- MAXROWS
- MERGE
- META

View File

@ -105,6 +105,8 @@ Usage: taosdump [OPTION...] dbname [tbname ...]
-L, --loose-mode Using loose mode if the table name and column name
use letter and number only. Default is NOT.
-n, --no-escape No escape char '`'. Default is using it.
-Q, --dot-replace Repalce dot character with underline character in
the table name.
-T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is
8.
-C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service

View File

@ -133,6 +133,7 @@
<configuration>
<source>8</source>
<target>8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>

View File

@ -8,4 +8,4 @@ java -jar target/taosdemo-2.0.1-jar-with-dependencies.jar -host <hostname> -data
```
如果发生错误 Exception in thread "main" java.lang.UnsatisfiedLinkError: no taos in java.library.path
请检查是否安装 TDengine 客户端安装包或编译 TDengine 安装。如果确定已经安装过还出现这个错误,可以在命令行 java 后加 -Djava.library.path=/usr/local/lib 来指定寻找共享库的路径。
请检查是否安装 TDengine 客户端安装包或编译 TDengine 安装。如果确定已经安装过还出现这个错误,可以在命令行 java 后加 -Djava.library.path=/usr/lib 来指定寻找共享库的路径。

View File

@ -113,7 +113,7 @@
#define TK_TABLE_PREFIX 95
#define TK_TABLE_SUFFIX 96
#define TK_NK_COLON 97
#define TK_MAX_SPEED 98
#define TK_BWLIMIT 98
#define TK_START 99
#define TK_TIMESTAMP 100
#define TK_END 101

View File

@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring,
jint, jlong);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqCommitAsync
@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong,
jobject);
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong,
jobject);
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong,
jstring, jint, jlong, jobject);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqUnsubscribeImp
@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
jstring, jobject);
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring,
jint);
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint);
#ifdef __cplusplus
}
#endif

View File

@ -218,7 +218,16 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) {
if (strlen(oneTable->childTableName) == 0) {
SArray *dst = taosArrayDup(oneTable->tags, NULL);
RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
ASSERT(oneTable->sTableNameLen < TSDB_TABLE_NAME_LEN);
char superName[TSDB_TABLE_NAME_LEN] = {0};
RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
if(tsSmlDot2Underline){
memcpy(superName, oneTable->sTableName, oneTable->sTableNameLen);
smlStrReplace(superName, oneTable->sTableNameLen);
rName.stbFullName = superName;
}else{
rName.stbFullName = oneTable->sTableName;
}
buildChildTableName(&rName);
taosArrayDestroy(dst);
@ -230,6 +239,9 @@ void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tin
char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0};
size_t nLen = strlen(tinfo->childTableName);
memcpy(key, currElement->measure, currElement->measureLen);
if(tsSmlDot2Underline){
smlStrReplace(key, currElement->measureLen);
}
memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen);
void *uid =
taosHashGet(info->tableUids, key,

View File

@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
TAOS_RES *res = (TAOS_RES *)jres;
return tmq_commit_sync(tmq, res);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniError("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
return tmq_commit_sync(tmq, NULL);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jint vgId, jlong offset) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code));
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return code;
}
// deprecated
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy
tmq_commit_async(tmq, res, consumer_callback, offset);
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jobject offset) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
offset = (*env)->NewGlobalRef(env, offset);
tmq_commit_async(tmq, NULL, consumer_callback, offset);
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jint vgId, jlong offset,
jobject callback) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
callback = (*env)->NewGlobalRef(env, callback);
tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj,
jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
if (res != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
tmq_err2str(res));
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
tmq_free_assignment(pAssign);
return (jint)res;
}
@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
tmq_free_assignment(pAssign);
return JNI_SUCCESS;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint vgId) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
int64_t offset = tmq_committed(tmq, topicName, vgId);
if (offset < JNI_SUCCESS && offset != -2147467247) {
jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName,
vgId, offset, tmq_err2str(offset));
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jlong)offset;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint vgId) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
int64_t offset = tmq_position(tmq, topicName, vgId);
if (offset < JNI_SUCCESS) {
jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId,
offset, tmq_err2str(offset));
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jlong)offset;
}

View File

@ -1340,8 +1340,9 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
void* pData = colDataGetData(pSrc, rowIdx);
bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
void* pData = NULL;
if (!isNull) pData = colDataGetData(pSrc, rowIdx);
colDataSetVal(pDst, 0, pData, isNull);
}

View File

@ -33,7 +33,7 @@ enum {
int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode);
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId);
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);

View File

@ -73,7 +73,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info){
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
if (pClearMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
@ -82,7 +82,11 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
pClearMsg->consumerId = consumerId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
.pCont = pClearMsg,
.contLen = sizeof(SMqConsumerClearMsg),
.info = *info,
};
mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
@ -119,7 +123,7 @@ void mndRebCntDec() {
}
}
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode) {
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
int32_t numOfTopics = taosArrayGetSize(pTopicList);
for (int32_t i = 0; i < numOfTopics; i++) {
@ -129,6 +133,11 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
return -1;
}
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
}
mndTransSetDbName(pTrans, pOneTopic, NULL);
if(mndTransCheckConflict(pMnode, pTrans) != 0){
mndReleaseTopic(pMnode, pTopic);
@ -167,7 +176,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
if (pTrans == NULL) {
goto FAIL;
}
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode) != 0){
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){
goto FAIL;
}
@ -205,7 +214,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pMsg, "clear-csm");
if (pTrans == NULL) goto FAIL;
if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode) != 0){
if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){
goto FAIL;
}
// this is the drop action, not the update action
@ -292,7 +301,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
if (status == MQ_CONSUMER_STATUS_READY) {
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
taosRLockLatch(&pConsumer->lock);
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
@ -307,7 +316,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
}
} else if (status == MQ_CONSUMER_STATUS_LOST) {
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
}
} else { // MQ_CONSUMER_STATUS_REBALANCE
taosRLockLatch(&pConsumer->lock);
@ -384,6 +393,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
.pCont = pRecoverMsg,
.contLen = sizeof(SMqConsumerRecoverMsg),
.info = pMsg->info,
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
}
@ -458,6 +468,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
.pCont = pRecoverMsg,
.contLen = sizeof(SMqConsumerRecoverMsg),
.info = pMsg->info,
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
@ -646,7 +657,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto _over;
}
code = validateTopics(pTrans, pTopicList, pMnode);
code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}

View File

@ -1564,6 +1564,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
if (pStream->status != STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream);
return 0;
}

View File

@ -824,7 +824,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
}
if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) {
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
}
sdbRelease(pMnode->pSdb, pConsumer);
}

View File

@ -386,6 +386,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic");
if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
code = -1;
goto _OUT;
}
@ -400,7 +401,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) {
code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj);
if (code != 0) {
goto _OUT;
}
@ -418,6 +420,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (pCreate->withMeta) {
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
code = terrno;
goto _OUT;
}
@ -426,13 +429,15 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
code = nodesStringToNode(pCreate->ast, &pAst);
if (code != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
goto _OUT;
}
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
if (code != 0) {
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
goto _OUT;
}
@ -440,6 +445,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
if (topicObj.ntbColIds == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _OUT;
}
@ -450,12 +456,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.ntbColIds = NULL;
}
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
code = qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema);
if (code != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
goto _OUT;
}
if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
code = nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL);
if (code != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
goto _OUT;
}
@ -463,6 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
code = terrno;
goto _OUT;
}
@ -485,6 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
code = -1;
goto _OUT;
}
@ -510,7 +520,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
// encoder check alter info
int32_t len;
int32_t code;
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) {
sdbRelease(pSdb, pVgroup);
@ -525,6 +534,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
code = -1;
goto _OUT;
}
tEncoderClear(&encoder);
@ -539,6 +549,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
code = -1;
goto _OUT;
}
buf = NULL;
@ -548,6 +559,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
code = -1;
goto _OUT;
}
@ -723,7 +735,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
if (found){
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pReq->info);
mndReleaseConsumer(pMnode, pConsumer);
continue;
}
@ -769,7 +781,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
code = mndDropSubByTopic(pMnode, pTrans, dropReq.name);
if ( code < 0) {
if (code < 0) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
goto end;
}

View File

@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
}
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
int32_t code = walNextValidMsg(pReader);
if (code != TSDB_CODE_SUCCESS) {
int32_t code = 0;
while(1) {
code = walNextValidMsg(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SWalCont* pCont = &pReader->pHead->head;
int64_t ver = pCont->version;
if (ver > maxVer) {
tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS;
}
if (pCont->msgType == TDMT_VND_SUBMIT) {
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return code;
}
memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
tqError("%s failed to create data submit for stream since out of memory", id);
return code;
}
} else if (pCont->msgType == TDMT_VND_DELETE) {
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
if (code == TSDB_CODE_SUCCESS) {
if (*pItem == NULL) {
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
// we need to continue check next data in the wal files.
continue;
} else {
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
}
} else {
terrno = code;
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
return code;
}
} else {
ASSERT(0);
}
return code;
}
int64_t ver = pReader->pHead->head.version;
if (ver > maxVer) {
tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS;
}
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return -1;
}
memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", id);
return terrno;
}
} else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
} else {
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
}
} else {
ASSERT(0);
}
return 0;
}
// todo ignore the error in wal?

View File

@ -177,6 +177,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i
}
// todo time window chosen problem: t or prev value?
if (t > pInfo->pFillInfo->start) t -= pInterval->sliding;
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t);
}
}
@ -838,6 +839,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
(pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) {
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
resetFillWindow(&pFillSup->prev);
pFillSup->prev.key = pFillSup->cur.key;
pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
} else if (hasPrevWindow(pFillSup)) {
@ -851,6 +853,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
resetFillWindow(&pFillSup->next);
pFillSup->next.key = pFillSup->cur.key;
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
pFillInfo->preRowKey = INT64_MIN;
@ -1229,8 +1232,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
SWinKey nextKey = {.groupId = groupId, .ts = ts};
while (pInfo->srcDelRowIndex < pBlock->info.rows) {
void* nextVal = NULL;
int32_t nextLen = 0;
TSKEY delTs = tsStarts[pInfo->srcDelRowIndex];
uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex];
int32_t code = TSDB_CODE_SUCCESS;
@ -1245,7 +1246,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
if (delTs == nextKey.ts) {
code = pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
if (code == TSDB_CODE_SUCCESS) {
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextLen);
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
}
// ts will be deleted later
if (delTs != ts) {

View File

@ -972,7 +972,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
char* pSrcData = colDataGetData(pSrcCol, rowIndex);
char* pSrcData = NULL;
if (!isNull) pSrcData = colDataGetData(pSrcCol, rowIndex);
colDataSetVal(pDestCol, pDest->info.rows, pSrcData, isNull);
}
pDest->info.rows++;

View File

@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
printDataBlock(p, "project");
}
return (p->info.rows > 0) ? p : NULL;
}

View File

@ -1247,7 +1247,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
char* pSrcData = colDataGetData(pSrcCol, i);
char* pSrcData = NULL;
if (!isNull) pSrcData = colDataGetData(pSrcCol, i);
colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
}
pResult->info.rows++;

View File

@ -2905,6 +2905,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -2921,6 +2922,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
blockDataDestroy(pInfo->pUpdateRes);
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
taosArrayDestroy(pInfo->historyWins);
taosMemoryFreeClear(param);
@ -3240,6 +3242,31 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
return winNum;
}
static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SResultRow* pCurResult = NULL;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
// Just look for the window behind StartIndex
while (1) {
SResultWindowInfo winInfo = {0};
SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo);
if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) ||
!inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) {
taosMemoryFree(winInfo.pOutputBuf);
pAPI->stateStore.streamStateFreeCur(pCur);
break;
}
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
pAPI->stateStore.streamStateFreeCur(pCur);
taosMemoryFree(winInfo.pOutputBuf);
}
}
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore);
pWinInfo->pOutputBuf = NULL;
@ -3417,9 +3444,9 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
}
static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
int32_t size = taosArrayGetSize(pWinArray);
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
@ -3446,6 +3473,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
continue;
}
@ -3454,6 +3482,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
break;
}
}
@ -3464,7 +3493,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
saveResult(parentWin, pStUpdated);
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
} else {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
break;
}
}
@ -3703,11 +3734,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
void streamSessionReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
}
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
resSize);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
@ -3719,6 +3750,33 @@ void resetWinRange(STimeWindow* winRange) {
winRange->ekey = INT64_MAX;
}
void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange);
SResultWindowInfo winInfo = {0};
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
compactSessionSemiWindow(pOperator, &winInfo);
saveSessionOutputBuf(pAggSup, &winInfo);
}
taosMemoryFree(pBuf);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
void streamSessionReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
@ -3948,6 +4006,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pStUpdated);
pInfo->pStUpdated = NULL;
if(pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
@ -3996,8 +4059,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState);
}
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = pPhyNode->type;
@ -4035,6 +4098,7 @@ void destroyStreamStateOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
cleanupExprSupp(&pInfo->scalarSupp);
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
for (int32_t i = 0; i < size; i++) {
@ -4048,6 +4112,7 @@ void destroyStreamStateOperatorInfo(void* param) {
taosArrayDestroy(pInfo->historyWins);
tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted);
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
taosMemoryFreeClear(param);
}

View File

@ -286,7 +286,7 @@ retention(A) ::= NK_VARIABLE(B) NK_COLON NK_VARIABLE(C).
%type speed_opt { int32_t }
%destructor speed_opt { }
speed_opt(A) ::= . { A = 0; }
speed_opt(A) ::= MAX_SPEED NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); }
speed_opt(A) ::= BWLIMIT NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); }
start_opt(A) ::= . { A = NULL; }
start_opt(A) ::= START WITH NK_INTEGER(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }

View File

@ -137,7 +137,7 @@ static SKeyword keywordTable[] = {
{"MATCH", TK_MATCH},
{"MAXROWS", TK_MAXROWS},
{"MAX_DELAY", TK_MAX_DELAY},
{"MAX_SPEED", TK_MAX_SPEED},
{"BWLIMIT", TK_BWLIMIT},
{"MERGE", TK_MERGE},
{"META", TK_META},
{"ONLY", TK_ONLY},

View File

@ -1150,7 +1150,7 @@ static const YYCODETYPE yyFallback[] = {
0, /* TABLE_PREFIX => nothing */
0, /* TABLE_SUFFIX => nothing */
0, /* NK_COLON => nothing */
0, /* MAX_SPEED => nothing */
0, /* BWLIMIT => nothing */
0, /* START => nothing */
0, /* TIMESTAMP => nothing */
287, /* END => ABORT */
@ -1575,7 +1575,7 @@ static const char *const yyTokenName[] = {
/* 95 */ "TABLE_PREFIX",
/* 96 */ "TABLE_SUFFIX",
/* 97 */ "NK_COLON",
/* 98 */ "MAX_SPEED",
/* 98 */ "BWLIMIT",
/* 99 */ "START",
/* 100 */ "TIMESTAMP",
/* 101 */ "END",
@ -2114,7 +2114,7 @@ static const char *const yyRuleName[] = {
/* 140 */ "retention_list ::= retention_list NK_COMMA retention",
/* 141 */ "retention ::= NK_VARIABLE NK_COLON NK_VARIABLE",
/* 142 */ "speed_opt ::=",
/* 143 */ "speed_opt ::= MAX_SPEED NK_INTEGER",
/* 143 */ "speed_opt ::= BWLIMIT NK_INTEGER",
/* 144 */ "start_opt ::=",
/* 145 */ "start_opt ::= START WITH NK_INTEGER",
/* 146 */ "start_opt ::= START WITH NK_STRING",
@ -3335,7 +3335,7 @@ static const YYCODETYPE yyRuleInfoLhs[] = {
366, /* (140) retention_list ::= retention_list NK_COMMA retention */
369, /* (141) retention ::= NK_VARIABLE NK_COLON NK_VARIABLE */
361, /* (142) speed_opt ::= */
361, /* (143) speed_opt ::= MAX_SPEED NK_INTEGER */
361, /* (143) speed_opt ::= BWLIMIT NK_INTEGER */
362, /* (144) start_opt ::= */
362, /* (145) start_opt ::= START WITH NK_INTEGER */
362, /* (146) start_opt ::= START WITH NK_STRING */
@ -3940,7 +3940,7 @@ static const signed char yyRuleInfoNRhs[] = {
-3, /* (140) retention_list ::= retention_list NK_COMMA retention */
-3, /* (141) retention ::= NK_VARIABLE NK_COLON NK_VARIABLE */
0, /* (142) speed_opt ::= */
-2, /* (143) speed_opt ::= MAX_SPEED NK_INTEGER */
-2, /* (143) speed_opt ::= BWLIMIT NK_INTEGER */
0, /* (144) start_opt ::= */
-3, /* (145) start_opt ::= START WITH NK_INTEGER */
-3, /* (146) start_opt ::= START WITH NK_STRING */
@ -5016,7 +5016,7 @@ static YYACTIONTYPE yy_reduce(
case 330: /* bufsize_opt ::= */ yytestcase(yyruleno==330);
{ yymsp[1].minor.yy416 = 0; }
break;
case 143: /* speed_opt ::= MAX_SPEED NK_INTEGER */
case 143: /* speed_opt ::= BWLIMIT NK_INTEGER */
case 331: /* bufsize_opt ::= BUFSIZE NK_INTEGER */ yytestcase(yyruleno==331);
{ yymsp[-1].minor.yy416 = taosStr2Int32(yymsp[0].minor.yy0.z, NULL, 10); }
break;

View File

@ -286,7 +286,7 @@ TEST_F(ParserShowToUseTest, trimDatabase) {
run("TRIM DATABASE wxy_db");
setTrimDbReq("wxy_db", 100);
run("TRIM DATABASE wxy_db MAX_SPEED 100");
run("TRIM DATABASE wxy_db BWLIMIT 100");
}
TEST_F(ParserShowToUseTest, useDatabase) {

View File

@ -1613,6 +1613,9 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
if (pVal != NULL) *pVal = NULL;
if (pVLen != NULL) *pVLen = 0;
SStateSessionKey* pKTmp = &ktmp;
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
char* val = NULL;
@ -1620,19 +1623,23 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
if (len < 0) {
return -1;
}
if (pKTmp->opNum != pCur->number) {
taosMemoryFree(val);
return -1;
}
if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
taosMemoryFree(val);
return -1;
}
if (pVal != NULL) {
*pVal = (char*)val;
} else {
taosMemoryFree(val);
}
if (pVLen != NULL) *pVLen = len;
if (pKTmp->opNum != pCur->number) {
return -1;
}
if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
return -1;
}
if (pVLen != NULL) *pVLen = len;
*pKey = pKTmp->key;
return 0;
}

View File

@ -885,13 +885,16 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
char *data = taosMemoryMalloc(compressSize);
gzFile dstFp = NULL;
TdFilePtr pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
TdFilePtr pFile = NULL;
TdFilePtr pSrcFile = NULL;
pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM);
if (pSrcFile == NULL) {
ret = -1;
goto cmp_end;
}
TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
ret = -2;
goto cmp_end;
@ -910,6 +913,9 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
}
cmp_end:
if (pFile) {
taosCloseFile(&pFile);
}
if (pSrcFile) {
taosCloseFile(&pSrcFile);
}

View File

@ -130,9 +130,18 @@ class TDTestCase:
for j in range(0,60):
tdSql.checkData(i*1500+j, 1, None)
def test_fill_with_order_by(self):
sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart"
tdSql.query(sql)
tdSql.checkRows(1)
sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart desc"
tdSql.query(sql)
tdSql.checkRows(1)
def run(self):
self.prepareTestEnv()
self.test_partition_by_with_interval_fill_prev_new_group_fill_error()
self.test_fill_with_order_by()
def stop(self):
tdSql.close()

View File

@ -67,7 +67,7 @@ class TDTestCase:
tdSql.query(f"select distinct tbname from {dbname}.`sys_if_bytes_out`")
tdSql.checkRows(2)
tdSql.query(f"select * from {dbname}.t_fc70dec6677d4277c5d9799c4da806da order by times")
tdSql.query(f"select * from {dbname}.t_f67972b49aa8adf8bca5d0d54f0d850d order by times")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1.300000000)
tdSql.checkData(1, 1, 13.000000000)

View File

@ -237,7 +237,7 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
if totalConsumeRows < expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")

View File

@ -1533,6 +1533,7 @@ int sml_ts3724_Test() {
const char *sql[] = {
"stb.2,t1=1 f1=283i32 1632299372000",
"stb_2,t1=1 f1=283i32 1632299372000",
".stb2,t1=1 f1=106i32 1632299378000",
"stb2.,t1=1 f1=106i32 1632299378000",
};
@ -1547,6 +1548,18 @@ int sml_ts3724_Test() {
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
pRes = taos_query(taos, "select * from stb_2");
TAOS_ROW row = taos_fetch_row(pRes);
int numRows = taos_affected_rows(pRes);
ASSERT(numRows == 1);
taos_free_result(pRes);
pRes = taos_query(taos, "show stables");
row = taos_fetch_row(pRes);
numRows = taos_affected_rows(pRes);
ASSERT(numRows == 3);
taos_free_result(pRes);
taos_close(taos);
return code;