Merge branch '3.0' into feat/TD-30053-3.0
This commit is contained in:
commit
be44ea0a84
|
@ -329,6 +329,7 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_SHOW_DB_ALIVE_STMT,
|
QUERY_NODE_SHOW_DB_ALIVE_STMT,
|
||||||
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
|
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
|
||||||
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
|
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
|
||||||
|
QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT,
|
||||||
QUERY_NODE_RESTORE_DNODE_STMT,
|
QUERY_NODE_RESTORE_DNODE_STMT,
|
||||||
QUERY_NODE_RESTORE_QNODE_STMT,
|
QUERY_NODE_RESTORE_QNODE_STMT,
|
||||||
QUERY_NODE_RESTORE_MNODE_STMT,
|
QUERY_NODE_RESTORE_MNODE_STMT,
|
||||||
|
@ -2425,10 +2426,11 @@ int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistrib
|
||||||
void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq* pReq);
|
void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t useless;
|
int32_t reserved;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
char* sql;
|
char* sql;
|
||||||
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
} SBalanceVgroupLeaderReq;
|
} SBalanceVgroupLeaderReq;
|
||||||
|
|
||||||
int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
|
int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
|
||||||
|
|
|
@ -582,6 +582,7 @@ typedef struct SBalanceVgroupStmt {
|
||||||
typedef struct SBalanceVgroupLeaderStmt {
|
typedef struct SBalanceVgroupLeaderStmt {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
char dbName[TSDB_DB_NAME_LEN];
|
||||||
} SBalanceVgroupLeaderStmt;
|
} SBalanceVgroupLeaderStmt;
|
||||||
|
|
||||||
typedef struct SMergeVgroupStmt {
|
typedef struct SMergeVgroupStmt {
|
||||||
|
|
|
@ -5962,9 +5962,11 @@ int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgr
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->useless) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->reserved) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
|
||||||
ENCODESQL();
|
ENCODESQL();
|
||||||
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -5977,12 +5979,15 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceV
|
||||||
tDecoderInit(&decoder, buf, bufLen);
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->reserved) < 0) return -1;
|
||||||
if (!tDecodeIsEnd(&decoder)) {
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
DECODESQL();
|
DECODESQL();
|
||||||
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
|
@ -446,17 +446,23 @@ typedef struct STimeWindowAggSupp {
|
||||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||||
} STimeWindowAggSupp;
|
} STimeWindowAggSupp;
|
||||||
|
|
||||||
|
typedef struct SSteamOpBasicInfo {
|
||||||
|
int32_t primaryPkIndex;
|
||||||
|
bool updateOperatorInfo;
|
||||||
|
} SSteamOpBasicInfo;
|
||||||
|
|
||||||
typedef struct SStreamScanInfo {
|
typedef struct SStreamScanInfo {
|
||||||
SExprInfo* pPseudoExpr;
|
SSteamOpBasicInfo basic;
|
||||||
int32_t numOfPseudoExpr;
|
SExprInfo* pPseudoExpr;
|
||||||
SExprSupp tbnameCalSup;
|
int32_t numOfPseudoExpr;
|
||||||
SExprSupp* pPartTbnameSup;
|
SExprSupp tbnameCalSup;
|
||||||
SExprSupp tagCalSup;
|
SExprSupp* pPartTbnameSup;
|
||||||
int32_t primaryTsIndex; // primary time stamp slot id
|
SExprSupp tagCalSup;
|
||||||
int32_t primaryKeyIndex;
|
int32_t primaryTsIndex; // primary time stamp slot id
|
||||||
SReadHandle readHandle;
|
int32_t primaryKeyIndex;
|
||||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
SReadHandle readHandle;
|
||||||
SColMatchInfo matchInfo;
|
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
||||||
|
SColMatchInfo matchInfo;
|
||||||
|
|
||||||
SArray* pBlockLists; // multiple SSDatablock.
|
SArray* pBlockLists; // multiple SSDatablock.
|
||||||
SSDataBlock* pRes; // result SSDataBlock
|
SSDataBlock* pRes; // result SSDataBlock
|
||||||
|
@ -568,10 +574,6 @@ typedef struct SOpCheckPointInfo {
|
||||||
SHashObj* children; // key:child id
|
SHashObj* children; // key:child id
|
||||||
} SOpCheckPointInfo;
|
} SOpCheckPointInfo;
|
||||||
|
|
||||||
typedef struct SSteamOpBasicInfo {
|
|
||||||
int32_t primaryPkIndex;
|
|
||||||
} SSteamOpBasicInfo;
|
|
||||||
|
|
||||||
typedef struct SStreamIntervalOperatorInfo {
|
typedef struct SStreamIntervalOperatorInfo {
|
||||||
SOptrBasicInfo binfo; // basic info
|
SOptrBasicInfo binfo; // basic info
|
||||||
SSteamOpBasicInfo basic;
|
SSteamOpBasicInfo basic;
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#ifndef STREAM_EXECUTORINT_H
|
||||||
|
#define STREAM_EXECUTORINT_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "executorInt.h"
|
||||||
|
|
||||||
|
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
|
||||||
|
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
|
||||||
|
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // STREAM_EXECUTORINT_H
|
|
@ -20,6 +20,7 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "systable.h"
|
#include "systable.h"
|
||||||
|
#include "streamexecutorInt.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
@ -2426,10 +2427,13 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
|
||||||
if (!pInfo->pState) {
|
if (!pInfo->pState) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
void* pBuf = NULL;
|
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
|
||||||
int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
|
void* pBuf = NULL;
|
||||||
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
|
int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
|
||||||
taosMemoryFree(pBuf);
|
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
saveStreamOperatorStateComplete(&pInfo->basic);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// other properties are recovered from the execution plan
|
// other properties are recovered from the execution plan
|
||||||
|
@ -2582,6 +2586,7 @@ FETCH_NEXT_BLOCK:
|
||||||
case STREAM_NORMAL:
|
case STREAM_NORMAL:
|
||||||
case STREAM_GET_ALL:
|
case STREAM_GET_ALL:
|
||||||
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
return pBlock;
|
return pBlock;
|
||||||
case STREAM_RETRIEVE: {
|
case STREAM_RETRIEVE: {
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||||
|
@ -2622,6 +2627,7 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
||||||
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
|
||||||
return pInfo->pDeleteDataRes;
|
return pInfo->pDeleteDataRes;
|
||||||
} else {
|
} else {
|
||||||
goto FETCH_NEXT_BLOCK;
|
goto FETCH_NEXT_BLOCK;
|
||||||
|
@ -2639,6 +2645,7 @@ FETCH_NEXT_BLOCK:
|
||||||
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||||
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
|
||||||
return pInfo->pDeleteDataRes;
|
return pInfo->pDeleteDataRes;
|
||||||
} else {
|
} else {
|
||||||
goto FETCH_NEXT_BLOCK;
|
goto FETCH_NEXT_BLOCK;
|
||||||
|
@ -2652,6 +2659,7 @@ FETCH_NEXT_BLOCK:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
return pBlock;
|
return pBlock;
|
||||||
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
|
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
|
||||||
|
@ -2659,6 +2667,7 @@ FETCH_NEXT_BLOCK:
|
||||||
case STREAM_SCAN_FROM_RES: {
|
case STREAM_SCAN_FROM_RES: {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||||
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
|
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
|
||||||
|
setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
|
||||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
pInfo->pRes->info.dataLoad = 1;
|
pInfo->pRes->info.dataLoad = 1;
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
|
@ -2762,6 +2771,7 @@ FETCH_NEXT_BLOCK:
|
||||||
}
|
}
|
||||||
|
|
||||||
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
|
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
|
||||||
|
setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
|
||||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
#include "operator.h"
|
#include "operator.h"
|
||||||
#include "querytask.h"
|
#include "querytask.h"
|
||||||
|
#include "streamexecutorInt.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
@ -415,13 +416,16 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
|
|
||||||
void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
|
void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
|
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
|
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
|
||||||
void* buf = taosMemoryCalloc(1, len);
|
int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true);
|
||||||
void* pBuf = buf;
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true);
|
void* pBuf = buf;
|
||||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
|
len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true);
|
||||||
strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), buf, len);
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
|
||||||
taosMemoryFree(buf);
|
strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
saveStreamOperatorStateComplete(&pInfo->basic);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) {
|
void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) {
|
||||||
|
@ -550,6 +554,7 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);
|
bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
#include "operator.h"
|
#include "operator.h"
|
||||||
#include "querytask.h"
|
#include "querytask.h"
|
||||||
|
#include "streamexecutorInt.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
|
@ -458,13 +459,16 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
|
|
||||||
void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
|
void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator);
|
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
|
||||||
void* buf = taosMemoryCalloc(1, len);
|
int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator);
|
||||||
void* pBuf = buf;
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
len = doStreamEventEncodeOpState(&pBuf, len, pOperator);
|
void* pBuf = buf;
|
||||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,
|
len = doStreamEventEncodeOpState(&pBuf, len, pOperator);
|
||||||
strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len);
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,
|
||||||
taosMemoryFree(buf);
|
strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
saveStreamOperatorStateComplete(&pInfo->basic);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) {
|
static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) {
|
||||||
|
@ -531,6 +535,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "executorInt.h"
|
||||||
|
|
||||||
|
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
|
||||||
|
if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
|
||||||
|
pBasicInfo->updateOperatorInfo = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo) {
|
||||||
|
return pBasicInfo->updateOperatorInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) {
|
||||||
|
pBasicInfo->updateOperatorInfo = false;
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
#include "operator.h"
|
#include "operator.h"
|
||||||
#include "querytask.h"
|
#include "querytask.h"
|
||||||
|
#include "streamexecutorInt.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
|
@ -1211,13 +1212,16 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
|
|
||||||
void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
|
void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator);
|
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
|
||||||
void* buf = taosMemoryCalloc(1, len);
|
int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator);
|
||||||
void* pBuf = buf;
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator);
|
void* pBuf = buf;
|
||||||
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
|
len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator);
|
||||||
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len);
|
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
|
||||||
taosMemoryFree(buf);
|
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
saveStreamOperatorStateComplete(&pInfo->basic);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) {
|
static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) {
|
||||||
|
@ -1347,6 +1351,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
pInfo->numOfDatapack++;
|
pInfo->numOfDatapack++;
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
||||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||||
|
@ -2690,13 +2695,16 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
|
|
||||||
void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
|
void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true);
|
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
|
||||||
void* buf = taosMemoryCalloc(1, len);
|
int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true);
|
||||||
void* pBuf = buf;
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true);
|
void* pBuf = buf;
|
||||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true);
|
||||||
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
|
||||||
taosMemoryFree(buf);
|
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
saveStreamOperatorStateComplete(&pInfo->basic);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
|
void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
|
||||||
|
@ -2766,6 +2774,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
|
@ -3176,6 +3185,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
|
@ -3673,13 +3683,16 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
|
|
||||||
void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
|
void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true);
|
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
|
||||||
void* buf = taosMemoryCalloc(1, len);
|
int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true);
|
||||||
void* pBuf = buf;
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true);
|
void* pBuf = buf;
|
||||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
|
len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true);
|
||||||
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
|
||||||
taosMemoryFree(buf);
|
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
saveStreamOperatorStateComplete(&pInfo->basic);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
|
static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
|
||||||
|
@ -3746,6 +3759,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
|
@ -4069,6 +4083,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pInfo->numOfDatapack++;
|
pInfo->numOfDatapack++;
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
|
@ -4465,6 +4480,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
pInfo->numOfDatapack++;
|
pInfo->numOfDatapack++;
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
||||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||||
|
|
|
@ -187,6 +187,8 @@ const char* nodesNodeName(ENodeType type) {
|
||||||
return "BalanceVgroupStmt";
|
return "BalanceVgroupStmt";
|
||||||
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
||||||
return "BalanceVgroupLeaderStmt";
|
return "BalanceVgroupLeaderStmt";
|
||||||
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
|
||||||
|
return "BalanceVgroupLeaderStmt";
|
||||||
case QUERY_NODE_MERGE_VGROUP_STMT:
|
case QUERY_NODE_MERGE_VGROUP_STMT:
|
||||||
return "MergeVgroupStmt";
|
return "MergeVgroupStmt";
|
||||||
case QUERY_NODE_SHOW_DB_ALIVE_STMT:
|
case QUERY_NODE_SHOW_DB_ALIVE_STMT:
|
||||||
|
@ -7607,6 +7609,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to serialize.
|
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to serialize.
|
||||||
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
||||||
return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to serialize.
|
return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to serialize.
|
||||||
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
case QUERY_NODE_MERGE_VGROUP_STMT:
|
case QUERY_NODE_MERGE_VGROUP_STMT:
|
||||||
return mergeVgroupStmtToJson(pObj, pJson);
|
return mergeVgroupStmtToJson(pObj, pJson);
|
||||||
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
|
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
|
||||||
|
@ -7953,7 +7957,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
||||||
case QUERY_NODE_BALANCE_VGROUP_STMT:
|
case QUERY_NODE_BALANCE_VGROUP_STMT:
|
||||||
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to deserialize.
|
return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to deserialize.
|
||||||
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
||||||
return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to deserialize.
|
return TSDB_CODE_SUCCESS;
|
||||||
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
|
||||||
|
return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to deserialize.
|
||||||
case QUERY_NODE_MERGE_VGROUP_STMT:
|
case QUERY_NODE_MERGE_VGROUP_STMT:
|
||||||
return jsonToMergeVgroupStmt(pJson, pObj);
|
return jsonToMergeVgroupStmt(pJson, pObj);
|
||||||
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
|
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
|
||||||
|
|
|
@ -473,6 +473,8 @@ SNode* nodesMakeNode(ENodeType type) {
|
||||||
return makeNode(type, sizeof(SBalanceVgroupStmt));
|
return makeNode(type, sizeof(SBalanceVgroupStmt));
|
||||||
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
||||||
return makeNode(type, sizeof(SBalanceVgroupLeaderStmt));
|
return makeNode(type, sizeof(SBalanceVgroupLeaderStmt));
|
||||||
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
|
||||||
|
return makeNode(type, sizeof(SBalanceVgroupLeaderStmt));
|
||||||
case QUERY_NODE_MERGE_VGROUP_STMT:
|
case QUERY_NODE_MERGE_VGROUP_STMT:
|
||||||
return makeNode(type, sizeof(SMergeVgroupStmt));
|
return makeNode(type, sizeof(SMergeVgroupStmt));
|
||||||
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
|
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
|
||||||
|
@ -1161,6 +1163,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field
|
case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field
|
||||||
case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field
|
case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field
|
||||||
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field
|
||||||
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: // no pointer field
|
||||||
case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field
|
case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
|
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
|
||||||
|
|
|
@ -274,6 +274,7 @@ SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId
|
||||||
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
|
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
|
||||||
SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt);
|
SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt);
|
||||||
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId);
|
SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId);
|
||||||
|
SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken* pDbName);
|
||||||
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2);
|
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2);
|
||||||
SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes);
|
SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes);
|
||||||
SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId);
|
SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId);
|
||||||
|
|
|
@ -773,6 +773,7 @@ cmd ::= KILL COMPACT NK_INTEGER(A).
|
||||||
/************************************************ merge/redistribute/ vgroup ******************************************/
|
/************************************************ merge/redistribute/ vgroup ******************************************/
|
||||||
cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); }
|
cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); }
|
||||||
cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); }
|
cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); }
|
||||||
|
cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); }
|
||||||
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }
|
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }
|
||||||
cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); }
|
cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); }
|
||||||
cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
|
cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
|
||||||
|
|
|
@ -2809,6 +2809,16 @@ SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgI
|
||||||
return (SNode*)pStmt;
|
return (SNode*)pStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken* pDbName){
|
||||||
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
SBalanceVgroupLeaderStmt* pStmt = (SBalanceVgroupLeaderStmt*)nodesMakeNode(QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT);
|
||||||
|
CHECK_OUT_OF_MEM(pStmt);
|
||||||
|
if (NULL != pDbName) {
|
||||||
|
COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName);
|
||||||
|
}
|
||||||
|
return (SNode*)pStmt;
|
||||||
|
}
|
||||||
|
|
||||||
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) {
|
SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
SMergeVgroupStmt* pStmt = (SMergeVgroupStmt*)nodesMakeNode(QUERY_NODE_MERGE_VGROUP_STMT);
|
SMergeVgroupStmt* pStmt = (SMergeVgroupStmt*)nodesMakeNode(QUERY_NODE_MERGE_VGROUP_STMT);
|
||||||
|
|
|
@ -10578,6 +10578,7 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm
|
||||||
static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) {
|
static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) {
|
||||||
SBalanceVgroupLeaderReq req = {0};
|
SBalanceVgroupLeaderReq req = {0};
|
||||||
req.vgId = pStmt->vgId;
|
req.vgId = pStmt->vgId;
|
||||||
|
if(pStmt->dbName != NULL) strcpy(req.db, pStmt->dbName);
|
||||||
int32_t code =
|
int32_t code =
|
||||||
buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP_LEADER, (FSerializeFunc)tSerializeSBalanceVgroupLeaderReq, &req);
|
buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP_LEADER, (FSerializeFunc)tSerializeSBalanceVgroupLeaderReq, &req);
|
||||||
tFreeSBalanceVgroupLeaderReq(&req);
|
tFreeSBalanceVgroupLeaderReq(&req);
|
||||||
|
@ -11263,6 +11264,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
||||||
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
|
||||||
code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode);
|
code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode);
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT:
|
||||||
|
code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode);
|
||||||
|
break;
|
||||||
case QUERY_NODE_MERGE_VGROUP_STMT:
|
case QUERY_NODE_MERGE_VGROUP_STMT:
|
||||||
code = translateMergeVgroup(pCxt, (SMergeVgroupStmt*)pNode);
|
code = translateMergeVgroup(pCxt, (SMergeVgroupStmt*)pNode);
|
||||||
break;
|
break;
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -246,11 +246,10 @@ static bool scanPathOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
|
|
||||||
static bool scanPathOptShouldGetFuncs(SLogicNode* pNode) {
|
static bool scanPathOptShouldGetFuncs(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||||
if (pNode->pParent && QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
|
if (!pNode->pParent || QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) ||
|
||||||
if (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType) return true;
|
WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType)
|
||||||
} else {
|
|
||||||
return !scanPathOptHaveNormalCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
|
return !scanPathOptHaveNormalCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
|
||||||
}
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) &&
|
if ((QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) &&
|
||||||
|
|
|
@ -1588,7 +1588,7 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
||||||
if (*dest == NULL) {
|
if (*dest == NULL) {
|
||||||
size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
|
size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
|
||||||
char* p = taosMemoryCalloc(1, size);
|
char* p = taosMemoryCalloc(1, size);
|
||||||
char* buf = p;
|
char* buf = p;
|
||||||
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
||||||
len += taosEncodeFixedI32((void**)&buf, key.len);
|
len += taosEncodeFixedI32((void**)&buf, key.len);
|
||||||
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
|
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
|
||||||
|
@ -2740,8 +2740,10 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
|
||||||
if (pKey->groupId == groupId) {
|
if (pKey->groupId == groupId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
taosMemoryFree((void*)*pVal);
|
if (pVal != NULL) {
|
||||||
*pVal = NULL;
|
taosMemoryFree((void*)*pVal);
|
||||||
|
*pVal = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -557,7 +557,6 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
const int32_t BATCH_LIMIT = 256;
|
const int32_t BATCH_LIMIT = 256;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
int32_t numOfElems = listNEles(pSnapshot);
|
|
||||||
SListNode* pNode = NULL;
|
SListNode* pNode = NULL;
|
||||||
|
|
||||||
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
|
int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName);
|
||||||
|
@ -589,8 +588,11 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
}
|
}
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
|
|
||||||
if (streamStateGetBatchSize(batch) > 0) {
|
int32_t numOfElems = streamStateGetBatchSize(batch);
|
||||||
|
if (numOfElems > 0) {
|
||||||
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
||||||
|
} else {
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamStateClearBatch(batch);
|
streamStateClearBatch(batch);
|
||||||
|
@ -609,6 +611,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
streamStateDestroyBatch(batch);
|
streamStateDestroyBatch(batch);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -302,8 +302,8 @@ int transClearBuffer(SConnBuffer* buf);
|
||||||
int transDestroyBuffer(SConnBuffer* buf);
|
int transDestroyBuffer(SConnBuffer* buf);
|
||||||
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
||||||
bool transReadComplete(SConnBuffer* connBuf);
|
bool transReadComplete(SConnBuffer* connBuf);
|
||||||
int transResetBuffer(SConnBuffer* connBuf);
|
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
|
||||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf);
|
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
|
||||||
|
|
||||||
int transSetConnOption(uv_tcp_t* stream, int keepalive);
|
int transSetConnOption(uv_tcp_t* stream, int keepalive);
|
||||||
|
|
||||||
|
|
|
@ -382,13 +382,18 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
|
|
||||||
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
|
||||||
|
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf);
|
||||||
if (msgLen <= 0) {
|
if (msgLen <= 0) {
|
||||||
taosMemoryFree(pHead);
|
taosMemoryFree(pHead);
|
||||||
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (resetBuf == 0) {
|
||||||
|
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), conn);
|
||||||
|
}
|
||||||
|
|
||||||
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
||||||
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,7 +126,7 @@ int transClearBuffer(SConnBuffer* buf) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
|
||||||
static const int HEADSIZE = sizeof(STransMsgHead);
|
static const int HEADSIZE = sizeof(STransMsgHead);
|
||||||
|
|
||||||
SConnBuffer* p = connBuf;
|
SConnBuffer* p = connBuf;
|
||||||
|
@ -137,7 +137,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
||||||
if (total >= HEADSIZE && !p->invalid) {
|
if (total >= HEADSIZE && !p->invalid) {
|
||||||
*buf = taosMemoryCalloc(1, total);
|
*buf = taosMemoryCalloc(1, total);
|
||||||
memcpy(*buf, p->buf, total);
|
memcpy(*buf, p->buf, total);
|
||||||
if (transResetBuffer(connBuf) < 0) {
|
if (transResetBuffer(connBuf, resetBuf) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -146,7 +146,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transResetBuffer(SConnBuffer* connBuf) {
|
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
|
||||||
SConnBuffer* p = connBuf;
|
SConnBuffer* p = connBuf;
|
||||||
if (p->total < p->len) {
|
if (p->total < p->len) {
|
||||||
int left = p->len - p->total;
|
int left = p->len - p->total;
|
||||||
|
@ -159,8 +159,10 @@ int transResetBuffer(SConnBuffer* connBuf) {
|
||||||
p->total = 0;
|
p->total = 0;
|
||||||
p->len = 0;
|
p->len = 0;
|
||||||
if (p->cap > BUFFER_CAP) {
|
if (p->cap > BUFFER_CAP) {
|
||||||
p->cap = BUFFER_CAP;
|
if (resetBuf) {
|
||||||
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
p->cap = BUFFER_CAP;
|
||||||
|
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(0, "invalid read from sock buf");
|
ASSERTS(0, "invalid read from sock buf");
|
||||||
|
|
|
@ -342,11 +342,15 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
|
|
||||||
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead);
|
int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1;
|
||||||
|
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf);
|
||||||
if (msgLen <= 0) {
|
if (msgLen <= 0) {
|
||||||
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
|
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (resetBuf == 0) {
|
||||||
|
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn);
|
||||||
|
}
|
||||||
|
|
||||||
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
||||||
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
|
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
|
||||||
|
@ -676,7 +680,8 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
|
||||||
taosMemoryFree(smsg);
|
taosMemoryFree(smsg);
|
||||||
}
|
}
|
||||||
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); }
|
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); }
|
||||||
static void destroyAllConn(SWorkThrd* pThrd) {
|
|
||||||
|
static void destroyAllConn(SWorkThrd* pThrd) {
|
||||||
tTrace("thread %p destroy all conn ", pThrd);
|
tTrace("thread %p destroy all conn ", pThrd);
|
||||||
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
|
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
|
||||||
queue* h = QUEUE_HEAD(&pThrd->conn);
|
queue* h = QUEUE_HEAD(&pThrd->conn);
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
# army-test
|
# army-test
|
||||||
#
|
#
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
|
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3Basic.py -N 3
|
,,n,army,python3 ./test.py -f enterprise/s3/s3Basic.py -N 3
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
|
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py
|
,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f community/query/test_join.py
|
,,y,army,./pytest.sh python3 ./test.py -f community/query/test_join.py
|
||||||
|
|
|
@ -22,7 +22,7 @@ class TDTestCase:
|
||||||
self.vgroups = 4
|
self.vgroups = 4
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 10000
|
||||||
self.duraion = '1h'
|
self.duraion = '1d'
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -33,7 +33,7 @@ class TDTestCase:
|
||||||
if dropFlag == 1:
|
if dropFlag == 1:
|
||||||
tsql.execute("drop database if exists %s"%(dbName))
|
tsql.execute("drop database if exists %s"%(dbName))
|
||||||
|
|
||||||
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
|
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s stt_trigger 1"%(dbName, vgroups, replica, duration))
|
||||||
tdLog.debug("complete to create database %s"%(dbName))
|
tdLog.debug("complete to create database %s"%(dbName))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -266,11 +266,11 @@ class TDTestCase:
|
||||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name),
|
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name),
|
||||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name),
|
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name),
|
||||||
|
|
||||||
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1s)' % (col_name, col_name, col_name),
|
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30d)' % (col_name, col_name, col_name),
|
||||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name),
|
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name),
|
||||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name),
|
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name),
|
||||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name),
|
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name),
|
||||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name),
|
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name),
|
||||||
|
|
||||||
'select _wstart as ts, count(*), tbname as a, %s from meters partition by %s, tbname interval(1s)' % (col_name, col_name),
|
'select _wstart as ts, count(*), tbname as a, %s from meters partition by %s, tbname interval(1s)' % (col_name, col_name),
|
||||||
'select _wstart as ts, count(*), t1 as a, %s from meters partition by %s, t1 interval(1s)' % (col_name, col_name),
|
'select _wstart as ts, count(*), t1 as a, %s from meters partition by %s, t1 interval(1s)' % (col_name, col_name),
|
||||||
|
@ -317,6 +317,7 @@ class TDTestCase:
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
|
tdSql.execute('flush database test')
|
||||||
#time.sleep(99999999)
|
#time.sleep(99999999)
|
||||||
self.test_sort_for_partition_hint()
|
self.test_sort_for_partition_hint()
|
||||||
self.test_sort_for_partition_res()
|
self.test_sort_for_partition_res()
|
||||||
|
|
|
@ -37,15 +37,15 @@ void simLogSql(char *sql, bool useSharp) {
|
||||||
|
|
||||||
char *simParseHostName(char *varName) {
|
char *simParseHostName(char *varName) {
|
||||||
static char hostName[140];
|
static char hostName[140];
|
||||||
//#ifdef WINDOWS
|
//#ifdef WINDOWS
|
||||||
// hostName[0] = '\"';
|
// hostName[0] = '\"';
|
||||||
// taosGetFqdn(&hostName[1]);
|
// taosGetFqdn(&hostName[1]);
|
||||||
// int strEndIndex = strlen(hostName);
|
// int strEndIndex = strlen(hostName);
|
||||||
// hostName[strEndIndex] = '\"';
|
// hostName[strEndIndex] = '\"';
|
||||||
// hostName[strEndIndex + 1] = '\0';
|
// hostName[strEndIndex + 1] = '\0';
|
||||||
//#else
|
//#else
|
||||||
sprintf(hostName, "%s", "localhost");
|
sprintf(hostName, "%s", "localhost");
|
||||||
//#endif
|
//#endif
|
||||||
return hostName;
|
return hostName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -276,12 +276,16 @@ int32_t simExecuteExpression(SScript *script, char *exp) {
|
||||||
if (op1[0] == '=') {
|
if (op1[0] == '=') {
|
||||||
strcpy(simGetVariable(script, var1 + 1, var1Len - 1), t3);
|
strcpy(simGetVariable(script, var1 + 1, var1Len - 1), t3);
|
||||||
} else if (op1[0] == '<') {
|
} else if (op1[0] == '<') {
|
||||||
val0 = atoi(t0);
|
int64_t val0 = atoll(t0);
|
||||||
val1 = atoi(t3);
|
int64_t val1 = atoll(t3);
|
||||||
|
// val0 = atoi(t0);
|
||||||
|
// val1 = atoi(t3);
|
||||||
if (val0 >= val1) result = -1;
|
if (val0 >= val1) result = -1;
|
||||||
} else if (op1[0] == '>') {
|
} else if (op1[0] == '>') {
|
||||||
val0 = atoi(t0);
|
int64_t val0 = atoll(t0);
|
||||||
val1 = atoi(t3);
|
int64_t val1 = atoll(t3);
|
||||||
|
// val0 = atoi(t0);
|
||||||
|
// val1 = atoi(t3);
|
||||||
if (val0 <= val1) result = -1;
|
if (val0 <= val1) result = -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -378,16 +382,14 @@ bool simExecuteRunBackCmd(SScript *script, char *option) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void simReplaceDirSep (char *buf){
|
void simReplaceDirSep(char *buf) {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
int i=0;
|
int i = 0;
|
||||||
while(buf[i] != '\0')
|
while (buf[i] != '\0') {
|
||||||
{
|
if (buf[i] == '/') {
|
||||||
if(buf[i] == '/')
|
buf[i] = '\\';
|
||||||
{
|
}
|
||||||
buf[i] = '\\';
|
i++;
|
||||||
}
|
|
||||||
i++;
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -505,7 +507,7 @@ bool simExecuteSystemContentCmd(SScript *script, char *option) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool simExecuteSetBIModeCmd(SScript *script, char *option) {
|
bool simExecuteSetBIModeCmd(SScript *script, char *option) {
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
|
|
||||||
simVisuallizeOption(script, option, buf);
|
simVisuallizeOption(script, option, buf);
|
||||||
option = buf;
|
option = buf;
|
||||||
|
|
Loading…
Reference in New Issue