Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last

This commit is contained in:
Hongze Cheng 2022-08-03 12:19:28 +00:00
commit 7d1f2b76cf
62 changed files with 2635 additions and 2201 deletions

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG df8678f
GIT_TAG 88d26c3
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 2a2def1
GIT_TAG 58f58ee
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -176,18 +176,6 @@ Note: 由于 SHOW 语句已经被开发者熟悉和广泛使用,所以它们
| 5 | tag_type | BINARY(64) | tag 的类型 |
| 6 | tag_value | BINARY(16384) | tag 的值 |
## USER_STREAMS
提供用户创建的流计算的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | --------------------------- |
| 1 | stream_name | BINARY(192) | 流计算名称 |
| 2 | user_name | BINARY(23) | 创建流计算的用户 |
| 3 | dest_table | BINARY(192) | 流计算写入的目标表 |
| 4 | create_time | TIMESTAMP | 创建时间 |
| 5 | sql | BLOB | 创建流计算时提供的 SQL 语句 |
## INS_USERS
提供系统中创建的用户的相关信息。
@ -200,27 +188,44 @@ Note: 由于 SHOW 语句已经被开发者熟悉和广泛使用,所以它们
## INS_GRANTS
TODO
提供企业版授权的相关信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | -------------------------------------------------- |
| 1 | version | BINARY(9) | 企业版授权说明official(官方授权的)/trial(试用的) |
| 2 | cpu_cores | BINARY(9) | 授权使用的 CPU 核心数量 |
| 3 | dnodes | BINARY(10) | 授权使用的 dnode 节点数量 |
| 4 | streams | BINARY(10) | 授权创建的流数量 |
| 5 | users | BINARY(10) | 授权创建的用户数量 |
| 6 | accounts | BINARY(10) | 授权创建的帐户数量 |
| 7 | storage | BINARY(21) | 授权使用的存储空间大小 |
| 8 | connections | BINARY(21) | 授权使用的客户端连接数量 |
| 9 | databases | BINARY(11) | 授权使用的数据库数量 |
| 10 | speed | BINARY(9) | 授权使用的数据点每秒写入数量 |
| 11 | querytime | BINARY(9) | 授权使用的查询总时长 |
| 12 | timeseries | BINARY(21) | 授权使用的测点数量 |
| 13 | expired | BINARY(5) | 是否到期true到期false未到期 |
| 14 | expire_time | BINARY(19) | 试用期到期时间 |
## INS_VGROUPS
系统中所有 vgroups 的信息。
| # | **列名** | **数据类型** | **说明** |
| --- | :-------: | ------------ | ---------------------------- |
| 1 | vgroup_id | INT | vgroup id |
| 2 | db_name | BINARY(32) | 数据库名 |
| 3 | tables | INT | 此 vgroup 内有多少表 |
| 4 | status | BINARY(10) | 此 vgroup 的状态 |
| 5 | v1_dnode | INT | 第一个成员所在的 dnode 的 id |
| 6 | v1_status | BINARY(10) | 第一个成员的状态 |
| 7 | v2_dnode | INT | 第二个成员所在的 dnode 的 id |
| 8 | v2_status | BINARY(10) | 第二个成员的状态 |
| 9 | v3_dnode | INT | 第三个成员所在的 dnode 的 id |
| 10 | v3_status | BINARY(10) | 第三个成员的状态 |
| 11 | nfiles | INT | TODO |
| 12 | file_size | INT | TODO |
| 13 | tsma | TINYINT | TODO |
| # | **列名** | **数据类型** | **说明** |
| --- | :-------: | ------------ | ------------------------------------------------------ |
| 1 | vgroup_id | INT | vgroup id |
| 2 | db_name | BINARY(32) | 数据库名 |
| 3 | tables | INT | 此 vgroup 内有多少表 |
| 4 | status | BINARY(10) | 此 vgroup 的状态 |
| 5 | v1_dnode | INT | 第一个成员所在的 dnode 的 id |
| 6 | v1_status | BINARY(10) | 第一个成员的状态 |
| 7 | v2_dnode | INT | 第二个成员所在的 dnode 的 id |
| 8 | v2_status | BINARY(10) | 第二个成员的状态 |
| 9 | v3_dnode | INT | 第三个成员所在的 dnode 的 id |
| 10 | v3_status | BINARY(10) | 第三个成员的状态 |
| 11 | nfiles | INT | 此 vgroup 中数据/元数据文件的数量 |
| 12 | file_size | INT | 此 vgroup 中数据/元数据文件的大小 |
| 13 | tsma | TINYINT | 此 vgroup 是否专用于 Time-range-wise SMA1: 是, 0: 否 |
## INS_CONFIGS

View File

@ -49,9 +49,9 @@ int32_t grantCheck(EGrantType grant);
#ifndef GRANTS_CFG
#define GRANTS_SCHEMA static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
@ -59,8 +59,8 @@ int32_t grantCheck(EGrantType grant);
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
}
#define GRANT_CFG_ADD

View File

@ -337,8 +337,10 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
}
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper);
if (pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper);
}
}
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {

View File

@ -253,7 +253,8 @@ typedef struct SShowCreateTableStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
void* pCfg; // STableCfg
void* pDbCfg; // SDbCfgInfo
void* pTableCfg; // STableCfg
} SShowCreateTableStmt;
typedef struct SShowTableDistributedStmt {
@ -282,6 +283,7 @@ typedef struct SCreateIndexStmt {
ENodeType type;
EIndexType indexType;
bool ignoreExists;
char indexDbName[TSDB_DB_NAME_LEN];
char indexName[TSDB_INDEX_NAME_LEN];
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
@ -292,6 +294,7 @@ typedef struct SCreateIndexStmt {
typedef struct SDropIndexStmt {
ENodeType type;
bool ignoreNotExists;
char indexDbName[TSDB_DB_NAME_LEN];
char indexName[TSDB_INDEX_NAME_LEN];
} SDropIndexStmt;

View File

@ -114,6 +114,7 @@ typedef struct SAggLogicNode {
SNodeList* pAggFuncs;
bool hasLastRow;
bool hasTimeLineFunc;
bool onlyHasKeepOrderFunc;
} SAggLogicNode;
typedef struct SProjectLogicNode {
@ -552,6 +553,8 @@ typedef struct SQueryPlan {
void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext);
const char* dataOrderStr(EDataOrderLevel order);
#ifdef __cplusplus
}
#endif

View File

@ -269,6 +269,7 @@ typedef struct SSelectStmt {
bool hasInterpFunc;
bool hasLastRowFunc;
bool hasTimeLineFunc;
bool onlyHasKeepOrderFunc;
bool groupSort;
} SSelectStmt;

View File

@ -608,6 +608,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
#define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)

0
packaging/docker/bin/entrypoint.sh Normal file → Executable file
View File

0
packaging/docker/bin/env-to-cfg Normal file → Executable file
View File

0
packaging/docker/bin/taos-check Normal file → Executable file
View File

0
packaging/docker/dockerManifest.sh Normal file → Executable file
View File

0
packaging/docker/dockerbuild.sh Normal file → Executable file
View File

0
packaging/docker/dockerbuildi.sh Normal file → Executable file
View File

View File

@ -504,15 +504,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
pParamSet->waitingRspNum++;
pParamSet->totalRspNum++;
return 0;
}
@ -2196,7 +2197,7 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray*
cJSON* tvalue = NULL;
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
if(!buf) goto end;
if (!buf) goto end;
dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
tvalue = cJSON_CreateString(buf);
taosMemoryFree(buf);
@ -2506,8 +2507,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
launchQueryImpl(pRequest, &pQuery, true, NULL);
if(pRequest->code == TSDB_CODE_SUCCESS){
SCatalog* pCatalog = NULL;
if (pRequest->code == TSDB_CODE_SUCCESS) {
SCatalog* pCatalog = NULL;
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveTableMeta(pCatalog, &tableName);
}
@ -2575,8 +2576,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
launchQueryImpl(pRequest, &pQuery, true, NULL);
if(pRequest->code == TSDB_CODE_SUCCESS){
SCatalog* pCatalog = NULL;
if (pRequest->code == TSDB_CODE_SUCCESS) {
SCatalog* pCatalog = NULL;
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveTableMeta(pCatalog, &tableName);
}
@ -2695,7 +2696,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
}
launchQueryImpl(pRequest, pQuery, true, NULL);
if (pRequest->code == TSDB_CODE_SUCCESS){
if (pRequest->code == TSDB_CODE_SUCCESS) {
removeMeta(pTscObj, pRequest->tableList);
}
@ -2812,7 +2813,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
}
launchQueryImpl(pRequest, pQuery, true, NULL);
if (pRequest->code == TSDB_CODE_SUCCESS){
if (pRequest->code == TSDB_CODE_SUCCESS) {
removeMeta(pTscObj, pRequest->tableList);
}
code = pRequest->code;
@ -2827,7 +2828,7 @@ end:
// delete from db.tabl where .. -> delete from tabl where ..
// delete from db .tabl where .. -> delete from tabl where ..
//static void getTbName(char *sql){
// static void getTbName(char *sql){
// char *ch = sql;
//
// bool inBackQuote = false;
@ -2858,9 +2859,9 @@ end:
//}
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
SDeleteRes req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SDeleteRes req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
@ -2871,13 +2872,14 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
goto end;
}
// getTbName(req.tableFName);
// getTbName(req.tableFName);
char sql[256] = {0};
sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey);
sprintf(sql, "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName,
req.skey, req.tsColName, req.ekey);
printf("delete sql:%s\n", sql);
TAOS_RES* res = taos_query(taos, sql);
SRequestObj *pRequest = (SRequestObj *)res;
TAOS_RES* res = taos_query(taos, sql);
SRequestObj* pRequest = (SRequestObj*)res;
code = pRequest->code;
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
code = TSDB_CODE_SUCCESS;
@ -2985,9 +2987,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
code = TSDB_CODE_SUCCESS;
}
if(pRequest->code == TSDB_CODE_SUCCESS){
if (pRequest->code == TSDB_CODE_SUCCESS) {
SExecResult* pRes = &pRequest->body.resInfo.execRes;
if(pRes->res != NULL){
if (pRes->res != NULL) {
code = handleAlterTbExecRes(pRes->res, pCatalog);
}
}
@ -3001,23 +3003,23 @@ end:
return code;
}
typedef struct{
typedef struct {
SVgroupInfo vg;
void *data;
}VgData;
void* data;
} VgData;
static void destroyVgHash(void* data) {
VgData* vgData = (VgData*)data;
taosMemoryFreeClear(vgData->data);
}
int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
int32_t code = TSDB_CODE_SUCCESS;
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
SQuery *pQuery = NULL;
SQuery* pQuery = NULL;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
if(!pRequest){
if (!pRequest) {
uError("WriteRaw:createRequest error request is null");
code = terrno;
goto end;
@ -3033,9 +3035,9 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbname);
struct SCatalog *pCatalog = NULL;
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if(code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw: get gatlog error");
goto end;
}
@ -3060,17 +3062,17 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
}
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
uint64_t uid = pTableMeta->uid;
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for (int i = 0; i < numOfCols; i++) {
SSchema *schema = pTableMeta->schema + i;
SSchema* schema = pTableMeta->schema + i;
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if(IS_VAR_DATA_TYPE(schema->type)){
nVar ++;
if (IS_VAR_DATA_TYPE(schema->type)) {
nVar++;
}
}
@ -3079,22 +3081,22 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
int32_t schemaLen = 0;
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
SSubmitReq* subReq = taosMemoryCalloc(1, totalLen);
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
SRowBuilder rb = {0};
tdSRowInit(&rb, pTableMeta->sversion);
tdSRowSetTpInfo(&rb, numOfCols, fLen);
int32_t dataLen = 0;
char* pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
char* pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
int32_t* colLength = (int32_t*)pStart;
pStart += sizeof(int32_t) * numOfCols;
SResultColumn *pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
for (int32_t i = 0; i < numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
@ -3113,7 +3115,7 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
tdSRowResetBuf(&rb, rowData);
int32_t offset = 0;
for (int32_t k = 0; k < numOfCols; k++) {
const SSchema* pColumn = &pTableMeta->schema[k];
const SSchema* pColumn = &pTableMeta->schema[k];
if (IS_VAR_DATA_TYPE(pColumn->type)) {
if (pCol[k].offset[j] != -1) {
@ -3159,17 +3161,17 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = false;
pQuery->msgType = TDMT_VND_SUBMIT;
pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (NULL == pQuery->pRoot) {
uError("create pQuery->pRoot error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot);
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
@ -3183,7 +3185,7 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
subReq->header.contLen = htonl(subReq->length);
subReq->length = htonl(subReq->length);
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
subReq = NULL; // no need free
subReq = NULL; // no need free
taosArrayPush(nodeStmt->pDataBlocks, &dst);
launchQueryImpl(pRequest, pQuery, true, NULL);
@ -3195,16 +3197,16 @@ end:
return code;
}
static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
int32_t code = TSDB_CODE_SUCCESS;
SHashObj *pVgHash = NULL;
SQuery *pQuery = NULL;
static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pVgHash = NULL;
SQuery* pQuery = NULL;
SMqRspObj rspObj = {0};
SDecoder decoder = {0};
SDecoder decoder = {0};
terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
if(!pRequest){
if (!pRequest) {
uError("WriteRaw:createRequest error request is null");
return terrno;
}
@ -3214,7 +3216,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
tDecoderInit(&decoder, data, dataLen);
code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
if (code != 0){
if (code != 0) {
uError("WriteRaw:decode smqDataRsp error");
code = TSDB_CODE_INVALID_MSG;
goto end;
@ -3228,9 +3230,9 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pVgHash, destroyVgHash);
struct SCatalog *pCatalog = NULL;
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if(code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw: get gatlog error");
goto end;
}
@ -3252,20 +3254,20 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
if(code != TSDB_CODE_SUCCESS){
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw: setQueryResultFromRsp error");
goto end;
}
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for (int i = 0; i < pSW->nCols; i++) {
SSchema *schema = pSW->pSchema + i;
SSchema* schema = pSW->pSchema + i;
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if(IS_VAR_DATA_TYPE(schema->type)){
nVar ++;
if (IS_VAR_DATA_TYPE(schema->type)) {
nVar++;
}
}
@ -3276,7 +3278,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
if(!tbName){
if (!tbName) {
uError("WriteRaw: tbname is null");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
@ -3296,12 +3298,12 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
SSubmitReq* subReq = NULL;
SSubmitBlk* blk = NULL;
void *hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
if(hData){
void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
if (hData) {
vgData = *(VgData*)hData;
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
void *tmp = taosMemoryRealloc(vgData.data, totalLen);
void* tmp = taosMemoryRealloc(vgData.data, totalLen);
if (tmp == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
@ -3310,15 +3312,15 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
((VgData*)hData)->data = tmp;
subReq = (SSubmitReq*)(vgData.data);
blk = POINTER_SHIFT(vgData.data, subReq->length);
}else{
} else {
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
void *tmp = taosMemoryCalloc(1, totalLen);
void* tmp = taosMemoryCalloc(1, totalLen);
if (tmp == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
}
vgData.data = tmp;
taosHashPut(pVgHash, (const char *)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char *)&vgData, sizeof(vgData));
taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
subReq = (SSubmitReq*)(vgData.data);
subReq->length = sizeof(SSubmitReq);
subReq->numOfBlocks = 0;
@ -3336,7 +3338,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
uint64_t uid = pTableMeta->uid;
taosMemoryFreeClear(pTableMeta);
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
SRowBuilder rb = {0};
@ -3352,12 +3354,12 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
int32_t offset = 0;
for (int32_t k = 0; k < pSW->nCols; k++) {
const SSchema* pColumn = &pSW->pSchema[k];
char *data = rspObj.resInfo.row[k];
const SSchema* pColumn = &pSW->pSchema[k];
char* data = rspObj.resInfo.row[k];
if (!data) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
} else {
if(IS_VAR_DATA_TYPE(pColumn->type)){
if (IS_VAR_DATA_TYPE(pColumn->type)) {
data -= VARSTR_HEADER_SIZE;
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
@ -3389,21 +3391,21 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = false;
pQuery->msgType = TDMT_VND_SUBMIT;
pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (NULL == pQuery->pRoot) {
uError("create pQuery->pRoot error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot);
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
int32_t numOfVg = taosHashGetSize(pVgHash);
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
VgData *vData = (VgData *)taosHashIterate(pVgHash, NULL);
VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
while (vData) {
SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
@ -3413,14 +3415,14 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
dst->numOfTables = subReq->numOfBlocks;
dst->size = subReq->length;
dst->pData = (char*)subReq;
vData->data = NULL; // no need free
vData->data = NULL; // no need free
subReq->header.vgId = htonl(dst->vg.vgId);
subReq->version = htonl(1);
subReq->header.contLen = htonl(subReq->length);
subReq->length = htonl(subReq->length);
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
taosArrayPush(nodeStmt->pDataBlocks, &dst);
vData = (VgData *)taosHashIterate(pVgHash, vData);
vData = (VgData*)taosHashIterate(pVgHash, vData);
}
launchQueryImpl(pRequest, pQuery, true, NULL);
@ -3459,8 +3461,8 @@ char* tmq_get_json_meta(TAOS_RES* res) {
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
if (!raw || !res){
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
if (!raw || !res) {
return TSDB_CODE_INVALID_PARA;
}
if (TD_RES_TMQ_META(res)) {
@ -3468,8 +3470,8 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
raw->raw = pMetaRspObj->metaRsp.metaRsp;
raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
} else if(TD_RES_TMQ(res)){
SMqRspObj *rspObj = ((SMqRspObj*)res);
} else if (TD_RES_TMQ(res)) {
SMqRspObj* rspObj = ((SMqRspObj*)res);
int32_t len = 0;
int32_t code = 0;
@ -3478,7 +3480,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
return -1;
}
void *buf = taosMemoryCalloc(1, len);
void* buf = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, len);
tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
@ -3494,31 +3496,31 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
}
void tmq_free_raw(tmq_raw_data raw) {
if (raw.raw_type == RES_TYPE__TMQ){
if (raw.raw_type == RES_TYPE__TMQ) {
taosMemoryFree(raw.raw);
}
}
int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw){
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
if (!taos) {
return TSDB_CODE_INVALID_PARA;
}
if(raw.raw_type == TDMT_VND_CREATE_STB) {
if (raw.raw_type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_ALTER_STB){
} else if (raw.raw_type == TDMT_VND_ALTER_STB) {
return taosCreateStb(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_DROP_STB){
} else if (raw.raw_type == TDMT_VND_DROP_STB) {
return taosDropStb(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_CREATE_TABLE){
} else if (raw.raw_type == TDMT_VND_CREATE_TABLE) {
return taosCreateTable(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_ALTER_TABLE){
} else if (raw.raw_type == TDMT_VND_ALTER_TABLE) {
return taosAlterTable(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_DROP_TABLE) {
} else if (raw.raw_type == TDMT_VND_DROP_TABLE) {
return taosDropTable(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_DELETE){
} else if (raw.raw_type == TDMT_VND_DELETE) {
return taosDeleteData(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == RES_TYPE__TMQ){
} else if (raw.raw_type == RES_TYPE__TMQ) {
return tmqWriteRaw(taos, raw.raw, raw.raw_len);
}
return TSDB_CODE_INVALID_PARA;

View File

@ -18,8 +18,8 @@
#include "tcompare.h"
#include "tconfig.h"
#include "tdatablock.h"
#include "tlog.h"
#include "tgrant.h"
#include "tlog.h"
GRANT_CFG_DECLARE;
@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
tsNumOfVnodeStreamThreads = tsNumOfCores;
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
@ -598,7 +598,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
return 0;
}
void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) {
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) {
int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
@ -612,7 +612,6 @@ void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) {
*forbidden = false;
}
int32_t taosSetCfg(SConfig *pCfg, char *name) {
int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
@ -1114,12 +1113,12 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
const char *options[] = {
"dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag",
"tqDebugFlag", "fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag",
"tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag",
"tmrDebugFlag", "uDebugFlag", "smaDebugFlag", "rpcDebugFlag", "qDebugFlag", "metaDebugFlag",
};
int32_t *optionVars[] = {
&dDebugFlag, &vDebugFlag, &mDebugFlag, &wDebugFlag, &sDebugFlag, &tsdbDebugFlag,
&tqDebugFlag, &fsDebugFlag, &udfDebugFlag, &smaDebugFlag, &idxDebugFlag, &tdbDebugFlag,
&tmrDebugFlag, &uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag, &metaDebugFlag,
&tmrDebugFlag, &uDebugFlag, &smaDebugFlag, &rpcDebugFlag, &qDebugFlag, &metaDebugFlag,
};
int32_t optionSize = tListLen(options);

View File

@ -146,8 +146,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype);
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg, terrstr(),
TMSG_INFO(pMsg->msgType), qtype);
return terrno != 0 ? terrno : -1;
}

View File

@ -1011,6 +1011,11 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
goto _OVER;
}
if ((terrno = grantCheck(TSDB_GRANT_STABLE)) < 0) {
code = -1;
goto _OVER;
}
if (isAlter) {
bool needRsp = false;
SStbObj pDst = {0};

View File

@ -151,6 +151,11 @@ enum {
RSMA_ROLE_ITERATE = 4,
};
enum {
RSMA_RESTORE_REBOOT = 1,
RSMA_RESTORE_SYNC = 2,
};
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
@ -227,7 +232,8 @@ void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);

View File

@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) {
}
// restore the rsma
if (rsmaRestore(pSma) < 0) {
if (tdRsmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) {
goto _err;
}
}
@ -148,12 +148,14 @@ int32_t smaClose(SSma *pSma) {
/**
* @brief rsma env restore
*
* @param pSma
* @return int32_t
*
* @param pSma
* @param type
* @param committedVer
* @return int32_t
*/
static int32_t rsmaRestore(SSma *pSma) {
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer) {
ASSERT(VND_IS_RSMA(pSma->pVnode));
return tdProcessRSmaRestoreImpl(pSma);
return tdProcessRSmaRestoreImpl(pSma, type, committedVer);
}

View File

@ -41,12 +41,12 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
// adapt accordingly if definition of SRSmaInfo update
@ -80,7 +80,7 @@ void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName)
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
}
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char* path, char *outputName) {
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName) {
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
}
@ -319,9 +319,13 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo) {
ASSERT(0); // TODO: free original pRSmaInfo if exists abnormally
smaDebug("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
return TSDB_CODE_SUCCESS;
// TODO: free original pRSmaInfo if exists abnormally
tdFreeRSmaInfo(pSma, *(SRSmaInfo **)pRSmaInfo, true);
if (taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)) < 0) {
terrno = TSDB_CODE_RSMA_REMOVE_EXISTS;
goto _err;
}
smaWarn("vgId:%d, remove the rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
}
// from write queue: single thead
@ -648,7 +652,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
pItem->taskInfo, suid);
if (qSetMultiStreamInput(pItem->taskInfo, pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED;
}
@ -872,24 +876,23 @@ _err:
return TSDB_CODE_FAILED;
}
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer) {
SVnode *pVnode = pSma->pVnode;
STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), pVnode->state.committed, qTaskInfoFName);
tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), qTaskFileVer, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
goto _err;
}
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
*committed = 0;
if (pVnode->state.committed > 0) {
smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
if (qTaskFileVer > 0) {
smaWarn("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", not start as %s not exist",
TD_VID(pVnode), type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
} else {
smaDebug("vgId:%d, rsma restore for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
smaDebug("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
}
return TSDB_CODE_SUCCESS;
}
@ -914,7 +917,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
goto _err;
}
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
if (tdRSmaQTaskInfoRestore(pSma, type, &fIter) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter);
tdCloseTFile(&tFile);
tdDestroyTFile(&tFile);
@ -925,13 +928,13 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
tdCloseTFile(&tFile);
tdDestroyTFile(&tFile);
// restored successfully from committed
*committed = pVnode->state.committed;
// restored successfully from committed or sync
smaInfo("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload succeed", TD_VID(pVnode),
type, qTaskFileVer);
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, rsma restore for version %" PRIi64 ", qtaskinfo reload failed since %s", TD_VID(pVnode),
pVnode->state.committed, terrstr());
smaError("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload failed since %s",
TD_VID(pVnode), type, qTaskFileVer, terrstr());
return TSDB_CODE_FAILED;
}
@ -939,15 +942,14 @@ _err:
* @brief reload ts data from checkpoint
*
* @param pSma
* @param committed restore from committed version
* @return int32_t
*/
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed) {
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
// NOTHING TODO: the data would be restored from the unified WAL replay procedure
return TSDB_CODE_SUCCESS;
}
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) {
// step 1: iterate all stables to restore the rsma env
int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
@ -955,24 +957,24 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
}
if (nTables <= 0) {
smaDebug("vgId:%d, no need to restore rsma task since no tables", SMA_VID(pSma));
smaDebug("vgId:%d, no need to restore rsma task %" PRIi8 " since no tables", SMA_VID(pSma), type);
return TSDB_CODE_SUCCESS;
}
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
int64_t committed = -1;
if (tdRSmaRestoreQTaskInfoReload(pSma, &committed) < 0) {
if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
goto _err;
}
// step 3: reload ts data from checkpoint
if (tdRSmaRestoreTSDataReload(pSma, committed) < 0) {
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
goto _err;
}
smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer);
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, failed to restore rsma task since %s", SMA_VID(pSma), terrstr());
smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
qtaskFileVer, terrstr());
return TSDB_CODE_FAILED;
}
@ -1101,7 +1103,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter) {
while (1) {
// block iter
bool isFinish = false;
@ -1117,7 +1119,7 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
smaError("vgId:%d, restore rsma qtaskinfo file %s failed since %s", SMA_VID(pSma),
smaError("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s failed since %s", SMA_VID(pSma), type,
TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
return TSDB_CODE_FAILED;
}
@ -1130,8 +1132,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
infoItem.qTaskInfo = pIter->qBuf;
infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
// do the restore job
smaDebug("vgId:%d, restore the qtask info %s offset:%" PRIi64 "\n", SMA_VID(pSma),
TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
smaDebug("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s offset:%" PRIi64 "\n", SMA_VID(pSma),
type, TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
tdRSmaQTaskInfoItemRestore(pSma, &infoItem);
pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len);

View File

@ -275,7 +275,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit
qWriter->pSma = pSma;
char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 1, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!qTaskF) {
code = TAOS_SYSTEM_ERROR(errno);
@ -323,10 +323,19 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
// qtaskinfo
if (pWriter->pQTaskFWriter) {
char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pWriter->ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma),
pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
// rsma restore
if ((code = tdRsmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) {
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName);
}
}

View File

@ -408,6 +408,10 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
goto _end;
}
if (VND_IS_TSMA(pVnode)) {
tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
}
initReaderStatus(&pReader->status);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
@ -1906,20 +1910,19 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
++level;
}
int32_t vgId = TD_VID(pVnode);
const char* str = (idStr != NULL) ? idStr : "";
if (level == TSDB_RETENTION_L0) {
*pLevel = TSDB_RETENTION_L0;
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str);
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
return VND_RSMA0(pVnode);
} else if (level == TSDB_RETENTION_L1) {
*pLevel = TSDB_RETENTION_L1;
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str);
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
return VND_RSMA1(pVnode);
} else {
*pLevel = TSDB_RETENTION_L2;
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str);
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
return VND_RSMA2(pVnode);
}
}

View File

@ -330,6 +330,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
// return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0);
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
@ -490,11 +491,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
goto _exit;
}
if ((terrno = grantCheck(TSDB_GRANT_STABLE)) < 0) {
rcode = -1;
goto _exit;
}
if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
rcode = -1;
goto _exit;
@ -850,13 +846,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit;
}
if ((terrno = grantCheck(TSDB_GRANT_STABLE)) < 0) {
pRsp->code = terrno;
tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit;
}
if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
pRsp->code = terrno;
tDecoderClear(&decoder);

View File

@ -19,9 +19,8 @@
#define BATCH_DISABLE 1
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) ||
(type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) ||
(type == TDMT_VND_ALTER_REPLICA);
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_REPLICA);
}
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
@ -611,10 +610,10 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
do {
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
if (itemSize == 0) {
vDebug("vgId:%d, apply queue is empty, start write snapshot", pVnode->config.vgId);
vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
break;
} else {
vDebug("vgId:%d, %d items in apply queue, write snapshot later", pVnode->config.vgId);
vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId);
taosMsleep(10);
}
} while (true);
@ -630,10 +629,11 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, stop write snapshot, isApply:%d", pVnode->config.vgId, isApply);
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vDebug("vgId:%d, apply snapshot to vnode, code:0x%x", pVnode->config.vgId, code);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
return code;
#else
taosMemoryFree(pWriter);
@ -644,8 +644,9 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
vTrace("vgId:%d, write snapshot, len:%d", pVnode->config.vgId, len);
vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
return code;
#else
return 0;

View File

@ -192,6 +192,26 @@ char* buildRetension(SArray* pRetension) {
return p1;
}
static const char* cacheModelStr(int8_t cacheModel) {
switch (cacheModel) {
case TSDB_CACHE_MODEL_NONE:
return TSDB_CACHE_MODEL_NONE_STR;
case TSDB_CACHE_MODEL_LAST_ROW:
return TSDB_CACHE_MODEL_LAST_ROW_STR;
case TSDB_CACHE_MODEL_LAST_VALUE:
return TSDB_CACHE_MODEL_LAST_VALUE_STR;
case TSDB_CACHE_MODEL_BOTH:
return TSDB_CACHE_MODEL_BOTH_STR;
default:
break;
}
return TSDB_CACHE_MODEL_NONE_STR;
}
static const char* strictStr(int8_t strict) {
return TSDB_DB_STRICT_ON == strict ? TSDB_DB_STRICT_ON_STR : TSDB_DB_STRICT_OFF_STR;
}
static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, SDbCfgInfo* pCfg) {
blockDataEnsureCapacity(pBlock, 1);
pBlock->info.rows = 1;
@ -222,14 +242,15 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
char* retentions = buildRetension(pCfg->pRetensions);
len += sprintf(buf2 + VARSTR_HEADER_SIZE,
"CREATE DATABASE `%s` BUFFER %d CACHEMODEL %d COMP %d DURATION %dm "
"FSYNC %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"STRICT %d WAL %d VGROUPS %d SINGLE_STABLE %d",
dbFName, pCfg->buffer, pCfg->cacheLast, pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod,
pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
pCfg->pageSize, prec, pCfg->replications, pCfg->strict, pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables);
len += sprintf(
buf2 + VARSTR_HEADER_SIZE,
"CREATE DATABASE `%s` BUFFER %d CACHEMODEL '%s' COMP %d DURATION %dm "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"STRICT '%s' WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
dbFName, pCfg->buffer, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod,
pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
pCfg->pageSize, prec, pCfg->replications, strictStr(pCfg->strict), pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables);
if (retentions) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
@ -383,21 +404,21 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
return TSDB_CODE_SUCCESS;
}
void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg* pCfg) {
if (pCfg->commentLen > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " COMMENT '%s'", pCfg->pComment);
} else if (0 == pCfg->commentLen) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " COMMENT ''");
}
if (pCfg->watermark1 > 0) {
if (NULL != pDbCfg->pRetensions && pCfg->watermark1 > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " WATERMARK %" PRId64 "a", pCfg->watermark1);
if (pCfg->watermark2 > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", %" PRId64 "a", pCfg->watermark2);
}
}
if (pCfg->delay1 > 0) {
if (NULL != pDbCfg->pRetensions && pCfg->delay1 > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " MAX_DELAY %" PRId64 "a", pCfg->delay1);
if (pCfg->delay2 > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", %" PRId64 "a", pCfg->delay2);
@ -405,7 +426,7 @@ void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
}
int32_t funcNum = taosArrayGetSize(pCfg->pFuncs);
if (funcNum > 0) {
if (NULL != pDbCfg->pRetensions && funcNum > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " ROLLUP(");
for (int32_t i = 0; i < funcNum; ++i) {
char* pFunc = taosArrayGet(pCfg->pFuncs, i);
@ -419,7 +440,7 @@ void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
}
}
static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName, STableCfg* pCfg) {
static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* pDbCfg, char* tbName, STableCfg* pCfg) {
int32_t code = 0;
blockDataEnsureCapacity(pBlock, 1);
pBlock->info.rows = 1;
@ -439,7 +460,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName,
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ") TAGS (");
appendTagFields(buf2, &len, pCfg);
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")");
appendTableOptions(buf2, &len, pCfg);
appendTableOptions(buf2, &len, pDbCfg, pCfg);
} else if (TSDB_CHILD_TABLE == pCfg->tableType) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE TABLE `%s` USING `%s` (", tbName, pCfg->stbName);
appendTagNameFields(buf2, &len, pCfg);
@ -449,7 +470,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName,
return code;
}
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")");
appendTableOptions(buf2, &len, pCfg);
appendTableOptions(buf2, &len, pDbCfg, pCfg);
} else {
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE TABLE `%s` (", tbName);
appendColumnFields(buf2, &len, pCfg);
@ -465,7 +486,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName,
static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildCreateTbResultDataBlock();
int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->tableName, pStmt->pCfg);
int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->pDbCfg, pStmt->tableName, pStmt->pTableCfg);
if (code) {
return code;
}
@ -473,7 +494,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs
}
static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) {
STableCfg* pCfg = (STableCfg*)pStmt->pCfg;
STableCfg* pCfg = (STableCfg*)pStmt->pTableCfg;
if (TSDB_SUPER_TABLE != pCfg->tableType) {
terrno = TSDB_CODE_TSC_NOT_STABLE_ERROR;
return terrno;

View File

@ -90,6 +90,7 @@ struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t i
static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId);
setBufPageDirty(bufPage, true);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
return pRow;
}

View File

@ -844,6 +844,7 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows);

View File

@ -3328,17 +3328,19 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
return fillResult;
}
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExpr[i];
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
if (pExpr) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExpr[i];
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
}
}
}
taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr);
taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr);
}
}
}
@ -3768,11 +3770,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
taosMemoryFreeClear(pSchemaInfo->dbname);
if (pSchemaInfo->sw == NULL) {
return;
}
taosMemoryFree(pSchemaInfo->tablename);
taosMemoryFreeClear(pSchemaInfo->tablename);
tDeleteSSchemaWrapper(pSchemaInfo->sw);
tDeleteSSchemaWrapper(pSchemaInfo->qsw);
}

View File

@ -2246,7 +2246,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "derivative",
.type = FUNCTION_TYPE_DERIVATIVE,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateDerivative,
.getEnvFunc = getDerivativeFuncEnv,
.initFunc = derivativeFuncSetup,
@ -2453,7 +2454,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "diff",
.type = FUNCTION_TYPE_DIFF,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
.translateFunc = translateDiff,
.getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup,
@ -2487,7 +2489,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "csum",
.type = FUNCTION_TYPE_CSUM,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateCsum,
.getEnvFunc = getCsumFuncEnv,
.initFunc = functionSetup,
@ -2856,7 +2859,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "tbname",
.type = FUNCTION_TYPE_TBNAME,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateTbnameColumn,
.getEnvFunc = NULL,
.initFunc = NULL,
@ -2896,7 +2899,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_wstart",
.type = FUNCTION_TYPE_WSTART,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
@ -2906,7 +2909,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_wend",
.type = FUNCTION_TYPE_WEND,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
@ -2916,7 +2919,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_wduration",
.type = FUNCTION_TYPE_WDURATION,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateWduration,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
@ -2964,7 +2967,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_group_key",
.type = FUNCTION_TYPE_GROUP_KEY,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateGroupKey,
.getEnvFunc = getGroupKeyFuncEnv,
.initFunc = functionSetup,

View File

@ -976,8 +976,12 @@ int32_t cleanUpUdfs() {
}
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0;
if (gUdfdProxy.udfStubs == NULL || taosArrayGetSize(gUdfdProxy.udfStubs) == 0) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS;
}
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
int32_t i = 0;
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) {

View File

@ -82,7 +82,9 @@ static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) {
COMPARE_STRING_FIELD(dbName);
COMPARE_STRING_FIELD(tableName);
COMPARE_STRING_FIELD(colName);
COMPARE_STRING_FIELD(tableAlias);
if (0 == a->tableId) {
COMPARE_STRING_FIELD(tableAlias);
}
return true;
}

View File

@ -713,7 +713,8 @@ void nodesDestroyNode(SNode* pNode) {
break;
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
destroyTableCfg((STableCfg*)(((SShowCreateTableStmt*)pNode)->pCfg));
taosMemoryFreeClear(((SShowCreateTableStmt*)pNode)->pDbCfg);
destroyTableCfg((STableCfg*)(((SShowCreateTableStmt*)pNode)->pTableCfg));
break;
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT: // no pointer field
case QUERY_NODE_KILL_CONNECTION_STMT: // no pointer field
@ -1817,3 +1818,19 @@ int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) {
return TSDB_CODE_SUCCESS;
}
const char* dataOrderStr(EDataOrderLevel order) {
switch (order) {
case DATA_ORDER_LEVEL_NONE:
return "no order required";
case DATA_ORDER_LEVEL_IN_BLOCK:
return "in-datablock order";
case DATA_ORDER_LEVEL_IN_GROUP:
return "in-group order";
case DATA_ORDER_LEVEL_GLOBAL:
return "global order";
default:
break;
}
return "unknown";
}

View File

@ -176,11 +176,11 @@ SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort);
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode);
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName,
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName,
SNode* pRealTable, SNodeList* pCols, SNode* pOptions);
SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pStreamOptions);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pIndexName);
SNode* createCreateComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,

View File

@ -424,8 +424,8 @@ from_db_opt(A) ::= FROM db_name(B).
/************************************************ create index ********************************************************/
cmd ::= CREATE SMA INDEX not_exists_opt(D)
index_name(A) ON full_table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, &A, B, NULL, C); }
cmd ::= DROP INDEX exists_opt(B) index_name(A). { pCxt->pRootNode = createDropIndexStmt(pCxt, B, &A); }
full_table_name(A) ON full_table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, A, B, NULL, C); }
cmd ::= DROP INDEX exists_opt(B) full_table_name(A). { pCxt->pRootNode = createDropIndexStmt(pCxt, B, A); }
index_options(A) ::= FUNCTION NK_LP func_list(B) NK_RP INTERVAL
NK_LP duration_literal(C) NK_RP sliding_opt(D) sma_stream_opt(E). { A = createIndexOption(pCxt, B, releaseRawExprNode(pCxt, C), NULL, D, E); }
@ -608,10 +608,6 @@ column_alias(A) ::= NK_ID(B).
%destructor user_name { }
user_name(A) ::= NK_ID(B). { A = B; }
%type index_name { SToken }
%destructor index_name { }
index_name(A) ::= NK_ID(B). { A = B; }
%type topic_name { SToken }
%destructor topic_name { }
topic_name(A) ::= NK_ID(B). { A = B; }

View File

@ -767,6 +767,7 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
select->pFromTable = pTable;
sprintf(select->stmtName, "%p", select);
select->isTimeLineResult = true;
select->onlyHasKeepOrderFunc = true;
select->timeRange = TSWINDOW_INITIALIZER;
return (SNode*)select;
}
@ -1402,19 +1403,18 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
return (SNode*)pStmt;
}
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName,
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName,
SNode* pRealTable, SNodeList* pCols, SNode* pOptions) {
CHECK_PARSER_STATUS(pCxt);
if (!checkIndexName(pCxt, pIndexName)) {
return NULL;
}
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)nodesMakeNode(QUERY_NODE_CREATE_INDEX_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->indexType = type;
pStmt->ignoreExists = ignoreExists;
COPY_STRING_FORM_ID_TOKEN(pStmt->indexName, pIndexName);
strcpy(pStmt->indexDbName, ((SRealTableNode*)pIndexName)->table.dbName);
strcpy(pStmt->indexName, ((SRealTableNode*)pIndexName)->table.tableName);
strcpy(pStmt->dbName, ((SRealTableNode*)pRealTable)->table.dbName);
strcpy(pStmt->tableName, ((SRealTableNode*)pRealTable)->table.tableName);
nodesDestroyNode(pIndexName);
nodesDestroyNode(pRealTable);
pStmt->pCols = pCols;
pStmt->pOptions = (SIndexOptions*)pOptions;
@ -1434,15 +1434,14 @@ SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInt
return (SNode*)pOptions;
}
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName) {
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pIndexName) {
CHECK_PARSER_STATUS(pCxt);
if (!checkDbName(pCxt, NULL, true) || !checkIndexName(pCxt, pIndexName)) {
return NULL;
}
SDropIndexStmt* pStmt = (SDropIndexStmt*)nodesMakeNode(QUERY_NODE_DROP_INDEX_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->ignoreNotExists = ignoreNotExists;
COPY_STRING_FORM_ID_TOKEN(pStmt->indexName, pIndexName);
strcpy(pStmt->indexDbName, ((SRealTableNode*)pIndexName)->table.dbName);
strcpy(pStmt->indexName, ((SRealTableNode*)pIndexName)->table.tableName);
nodesDestroyNode(pIndexName);
return (SNode*)pStmt;
}

View File

@ -269,16 +269,15 @@ static int32_t collectMetaKeyFromUseDatabase(SCollectMetaKeyCxt* pCxt, SUseDatab
static int32_t collectMetaKeyFromCreateIndex(SCollectMetaKeyCxt* pCxt, SCreateIndexStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
if (INDEX_TYPE_SMA == pStmt->indexType) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->tableName, pCxt->pMetaCache);
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code =
reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->tableName, pCxt->pMetaCache);
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pCxt->pMetaCache);
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pCxt->pMetaCache);
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
}
return code;
@ -366,8 +365,8 @@ static int32_t collectMetaKeyFromShowStreams(SCollectMetaKeyCxt* pCxt, SShowStmt
}
static int32_t collectMetaKeyFromShowTables(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB,
TSDB_INS_TABLE_TABLES, pCxt->pMetaCache);
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLES,
pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pStmt->pDbName) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal, pCxt->pMetaCache);
@ -457,6 +456,9 @@ static int32_t collectMetaKeyFromShowCreateTable(SCollectMetaKeyCxt* pCxt, SShow
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
return code;
}

View File

@ -1171,6 +1171,44 @@ static int32_t translateMultiRowsFunc(STranslateContext* pCxt, SFunctionNode* pF
return TSDB_CODE_SUCCESS;
}
static int32_t translateInterpFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsInterpFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (!isSelectStmt(pCxt->pCurrStmt) || SQL_CLAUSE_SELECT != pCxt->currClause) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
}
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
if (pSelect->hasAggFuncs || pSelect->hasMultiRowsFunc || pSelect->hasIndefiniteRowsFunc) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
}
if (NULL != pSelect->pWindow || NULL != pSelect->pGroupByList) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s function is not supported in window query or group query", pFunc->functionName);
}
if (hasInvalidFuncNesting(pFunc->pParameterList)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AGG_FUNC_NESTING);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsTimelineFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (!isSelectStmt(pCxt->pCurrStmt)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s function must be used in select statements", pFunc->functionName);
}
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
!isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s function requires valid time series input", pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static bool hasFillClause(SNode* pCurrStmt) {
if (!isSelectStmt(pCurrStmt)) {
return false;
@ -1291,6 +1329,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType);
pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType);
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false;
}
}
@ -1409,6 +1448,12 @@ static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* p
if (TSDB_CODE_SUCCESS == code) {
code = translateMultiRowsFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateInterpFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateTimelineFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == code) {
setFuncClassification(pCxt->pCurrStmt, pFunc);
}
@ -1685,6 +1730,9 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
(!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc)) {
return TSDB_CODE_SUCCESS;
}
if (!pSelect->onlyHasKeepOrderFunc) {
pSelect->isTimeLineResult = false;
}
CheckAggColCoexistCxt cxt = {.pTranslateCxt = pCxt, .existCol = false};
nodesRewriteExprs(pSelect->pProjectionList, doCheckAggColCoexist, &cxt);
if (!pSelect->isDistinct) {
@ -1751,8 +1799,7 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
static bool sysTableFromVnode(const char* pTable) {
return (0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_TABLE_DISTRIBUTED) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
(0 == strcmp(pTable, TSDB_INS_TABLE_TABLE_DISTRIBUTED) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
}
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
@ -2182,9 +2229,9 @@ static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
}
pCxt->currClause = SQL_CLAUSE_ORDER_BY;
code = translateExprList(pCxt, pSelect->pOrderByList);
if (TSDB_CODE_SUCCESS == code) {
code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pOrderByList);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pOrderByList);
}
return code;
}
@ -2265,15 +2312,15 @@ static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) {
}
static int32_t translateGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL != pSelect->pGroupByList && NULL != pSelect->pWindow) {
if (NULL == pSelect->pGroupByList) {
return TSDB_CODE_SUCCESS;
}
if (NULL != pSelect->pWindow) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST);
}
if (NULL != pSelect->pGroupByList) {
pCxt->currClause = SQL_CLAUSE_GROUP_BY;
pSelect->isTimeLineResult = false;
return translateExprList(pCxt, pSelect->pGroupByList);
}
return TSDB_CODE_SUCCESS;
pCxt->currClause = SQL_CLAUSE_GROUP_BY;
pSelect->isTimeLineResult = false;
return translateExprList(pCxt, pSelect->pGroupByList);
}
static int32_t getTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bool* pIsStrict) {
@ -2496,6 +2543,7 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS;
}
pSelect->isTimeLineResult = true;
pCxt->currClause = SQL_CLAUSE_WINDOW;
int32_t code = translateExpr(pCxt, &pSelect->pWindow);
if (TSDB_CODE_SUCCESS == code) {
@ -2562,12 +2610,13 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
return code;
}
static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) {
if (NULL == pPartitionByList) {
static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pPartitionByList) {
return TSDB_CODE_SUCCESS;
}
pSelect->isTimeLineResult = false;
pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
return translateExprList(pCxt, pPartitionByList);
return translateExprList(pCxt, pSelect->pPartitionByList);
}
static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) {
@ -2670,11 +2719,36 @@ static EDealRes replaceOrderByAliasImpl(SNode** pNode, void* pContext) {
}
static int32_t replaceOrderByAlias(STranslateContext* pCxt, SNodeList* pProjectionList, SNodeList* pOrderByList) {
if (NULL == pOrderByList) {
return TSDB_CODE_SUCCESS;
}
SReplaceOrderByAliasCxt cxt = {.pTranslateCxt = pCxt, .pProjectionList = pProjectionList};
nodesRewriteExprsPostOrder(pOrderByList, replaceOrderByAliasImpl, &cxt);
return pCxt->errCode;
}
static void resetResultTimeline(SSelectStmt* pSelect) {
if (NULL == pSelect->pOrderByList) {
return;
}
SNode* pOrder = ((SOrderByExprNode*)nodesListGetNode(pSelect->pOrderByList, 0))->pExpr;
if ((QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
isPrimaryKey((STempTableNode*)pSelect->pFromTable, pOrder)) ||
(QUERY_NODE_TEMP_TABLE != nodeType(pSelect->pFromTable) && isPrimaryKeyImpl(pOrder))) {
pSelect->isTimeLineResult = true;
} else {
pSelect->isTimeLineResult = false;
}
}
static int32_t replaceOrderByAliasForSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = replaceOrderByAlias(pCxt, pSelect->pProjectionList, pSelect->pOrderByList);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
resetResultTimeline(pSelect);
}
return code;
}
static int32_t translateSelectWithoutFrom(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->pCurrStmt = (SNode*)pSelect;
pCxt->currClause = SQL_CLAUSE_SELECT;
@ -2689,7 +2763,7 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
code = translateWhere(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = translatePartitionBy(pCxt, pSelect->pPartitionByList);
code = translatePartitionBy(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateWindow(pCxt, pSelect);
@ -2722,7 +2796,7 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
code = appendTsForImplicitTsFunc(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceOrderByAlias(pCxt, pSelect->pProjectionList, pSelect->pOrderByList);
code = replaceOrderByAliasForSelect(pCxt, pSelect);
}
return code;
}
@ -3705,6 +3779,11 @@ static int32_t checkTableWatermarkOption(STranslateContext* pCxt, STableOptions*
}
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, bool createStable) {
if (NULL != strchr(pStmt->tableName, '.')) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME,
"The table name cannot contain '.'");
}
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && !createStable && NULL != dbCfg.pRetensions) {
@ -4282,9 +4361,10 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
return buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
}
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, char* pTableName, int32_t* pVgId) {
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
int32_t* pVgId) {
SVgroupInfo vg = {0};
int32_t code = getTableHashVgroup(pCxt, pCxt->pParseCxt->db, pTableName, &vg);
int32_t code = getTableHashVgroup(pCxt, pDbName, pTableName, &vg);
if (TSDB_CODE_SUCCESS == code) {
*pVgId = vg.vgId;
}
@ -4301,7 +4381,7 @@ static int32_t getSmaIndexSql(STranslateContext* pCxt, char** pSql, int32_t* pLe
}
static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SSampleAstInfo* pInfo) {
pInfo->pDbName = pCxt->pParseCxt->db;
pInfo->pDbName = pStmt->dbName;
pInfo->pTableName = pStmt->tableName;
pInfo->pFuncs = nodesCloneList(pStmt->pOptions->pFuncs);
pInfo->pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
@ -4328,7 +4408,7 @@ static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt,
static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateSmaReq* pReq) {
SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->indexName, &name), pReq->name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->indexDbName, pStmt->indexName, &name), pReq->name);
memset(&name, 0, sizeof(SName));
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name), pReq->stb);
pReq->igExists = pStmt->ignoreExists;
@ -4352,7 +4432,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
}
}
int32_t code = getSmaIndexDstVgId(pCxt, pStmt->tableName, &pReq->dstVgId);
int32_t code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId);
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
}
@ -4365,7 +4445,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
static int32_t checkCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pCxt->pParseCxt->db, &dbCfg);
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && NULL != dbCfg.pRetensions) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_SMA_INDEX,
"Tables configured with the 'ROLLUP' option do not support creating sma index");
@ -4883,10 +4963,17 @@ static int32_t translateShowCreateDatabase(STranslateContext* pCxt, SShowCreateD
}
static int32_t translateShowCreateTable(STranslateContext* pCxt, SShowCreateTableStmt* pStmt) {
SName name;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
return getTableCfg(pCxt, &name, (STableCfg**)&pStmt->pCfg);
pStmt->pDbCfg = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
if (NULL == pStmt->pDbCfg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pDbCfg);
if (TSDB_CODE_SUCCESS == code) {
SName name;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
code = getTableCfg(pCxt, &name, (STableCfg**)&pStmt->pTableCfg);
}
return code;
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
@ -5917,6 +6004,10 @@ static int32_t checkCreateSubTable(STranslateContext* pCxt, SCreateSubTableClaus
if (0 != strcmp(pStmt->dbName, pStmt->useDbName)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_CORRESPONDING_STABLE_ERR);
}
if (NULL != strchr(pStmt->tableName, '.')) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME,
"The table name cannot contain '.'");
}
return TSDB_CODE_SUCCESS;
}
static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) {

File diff suppressed because it is too large Load Diff

View File

@ -316,7 +316,8 @@ TEST_F(ParserSelectTest, subquery) {
run("SELECT SUM(a) FROM (SELECT MAX(c1) a, ts FROM st1s1 PARTITION BY TBNAME INTERVAL(1m)) INTERVAL(1n)");
run("SELECT SUM(a) FROM (SELECT MAX(c1) a, _wstart FROM st1s1 PARTITION BY TBNAME INTERVAL(1m)) INTERVAL(1n)");
run("SELECT SUM(a) FROM (SELECT MAX(c1) a, _wstart FROM st1s1 PARTITION BY TBNAME INTERVAL(1m) ORDER BY _WSTART) "
"INTERVAL(1n)");
run("SELECT _C0 FROM (SELECT _ROWTS, ts FROM st1s1)");

View File

@ -54,7 +54,8 @@ TEST_F(ParserShowToUseTest, showCreateSTable) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_SHOW_CREATE_STABLE_STMT);
ASSERT_EQ(pQuery->execMode, QUERY_EXEC_MODE_LOCAL);
ASSERT_TRUE(pQuery->haveResultSet);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pCfg, nullptr);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pDbCfg, nullptr);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pTableCfg, nullptr);
});
run("SHOW CREATE STABLE st1");
@ -67,7 +68,8 @@ TEST_F(ParserShowToUseTest, showCreateTable) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_SHOW_CREATE_TABLE_STMT);
ASSERT_EQ(pQuery->execMode, QUERY_EXEC_MODE_LOCAL);
ASSERT_TRUE(pQuery->haveResultSet);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pCfg, nullptr);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pDbCfg, nullptr);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pTableCfg, nullptr);
});
run("SHOW CREATE TABLE t1");

View File

@ -23,13 +23,13 @@ extern "C" {
#include "planner.h"
#include "taoserror.h"
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
#define planInfo(param, ...) qInfo("PLAN: " param, __VA_ARGS__)
#define planDebug(param, ...) qDebug("PLAN: " param, __VA_ARGS__)
#define planDebugL(param, ...) qDebugL("PLAN: " param, __VA_ARGS__)
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
#define planFatal(param, ...) qFatal("PLAN: " param, ##__VA_ARGS__)
#define planError(param, ...) qError("PLAN: " param, ##__VA_ARGS__)
#define planWarn(param, ...) qWarn("PLAN: " param, ##__VA_ARGS__)
#define planInfo(param, ...) qInfo("PLAN: " param, ##__VA_ARGS__)
#define planDebug(param, ...) qDebug("PLAN: " param, ##__VA_ARGS__)
#define planDebugL(param, ...) qDebugL("PLAN: " param, ##__VA_ARGS__)
#define planTrace(param, ...) qTrace("PLAN: " param, ##__VA_ARGS__)
int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...);
int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList);

View File

@ -480,6 +480,7 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
pAgg->hasLastRow = pSelect->hasLastRowFunc;
pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc;
pAgg->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc;
pAgg->node.groupAction = GROUP_ACTION_SET;
pAgg->node.requireDataOrder = pAgg->hasTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE;
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;

View File

@ -146,19 +146,13 @@ static int32_t adjustJoinDataRequirement(SJoinLogicNode* pJoin, EDataOrderLevel
return TSDB_CODE_SUCCESS;
}
static bool isKeepOrderAggFunc(SNodeList* pFuncs) {
SNode* pFunc = NULL;
FOREACH(pFunc, pFuncs) {
if (!fmIsKeepOrderFunc(((SFunctionNode*)pFunc)->funcId)) {
return false;
}
}
return true;
}
static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) {
// The sort level of agg with group by output data can only be DATA_ORDER_LEVEL_NONE
if (requirement > DATA_ORDER_LEVEL_NONE && (NULL != pAgg->pGroupKeys || !isKeepOrderAggFunc(pAgg->pAggFuncs))) {
if (requirement > DATA_ORDER_LEVEL_NONE && (NULL != pAgg->pGroupKeys || !pAgg->onlyHasKeepOrderFunc)) {
planError(
"The output of aggregate cannot meet the requirements(%s) of the upper operator. "
"Illegal statement, should be intercepted in parser",
dataOrderStr(requirement));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
pAgg->node.resultDataOrder = requirement;
@ -231,6 +225,10 @@ static int32_t adjustSortDataRequirement(SSortLogicNode* pSort, EDataOrderLevel
static int32_t adjustPartitionDataRequirement(SPartitionLogicNode* pPart, EDataOrderLevel requirement) {
if (DATA_ORDER_LEVEL_GLOBAL == requirement) {
planError(
"The output of partition cannot meet the requirements(%s) of the upper operator. "
"Illegal statement, should be intercepted in parser",
dataOrderStr(requirement));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
pPart->node.resultDataOrder = requirement;

View File

@ -167,12 +167,13 @@ void streamFreeQitem(SStreamQueueItem* data) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->reqs);
for (int32_t i = 0; i < sz; i++) {
int32_t* ref = taosArrayGetP(pMerge->dataRefs, i);
(*ref)--;
if (*ref == 0) {
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
void* data = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(data);
taosMemoryFree(ref);
taosMemoryFree(pRef);
}
}
taosArrayDestroy(pMerge->reqs);

View File

@ -72,7 +72,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
continue;
}
qDebug("task %d(child %d) executed and get block");
qDebug("task %d(child %d) executed and get block", pTask->taskId, pTask->selfChildId);
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
@ -241,6 +241,8 @@ int32_t streamExec(SStreamTask* pTask) {
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
// temporarily disable status closing, since it runs out of threads
#if 0
// set status closing
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
@ -248,6 +250,7 @@ int32_t streamExec(SStreamTask* pTask) {
qDebug("stream exec, enter closing status");
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
#endif
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);

View File

@ -484,7 +484,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
}
}
sTrace("vgId:%d, sync get snapshot last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
snapshotLastApplyIndex, lastIndex);
return lastIndex;

View File

@ -41,18 +41,21 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
// syncNodePingAll(ths);
// syncNodePingPeers(ths);
sTrace("vgId:%d, sync timeout, type:ping count:%d", ths->vgId, ths->pingTimerCounter);
syncNodeTimerRoutine(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->electTimerCounter);
sInfo("vgId:%d, sync timeout, type:election count:%d", ths->vgId, ths->electTimerCounter);
syncNodeElect(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->heartbeatTimerCounter);
sInfo("vgId:%d, sync timeout, type:replicate count:%d", ths->vgId, ths->heartbeatTimerCounter);
syncNodeReplicate(ths);
}
} else {

View File

@ -612,6 +612,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
//index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")

View File

@ -331,7 +331,7 @@
./test.sh -f tsim/vnode/stable_replica3_vnode3.sim
# --- sync
./test.sh -f tsim/sync/3Replica1VgElect.sim
#./test.sh -f tsim/sync/3Replica1VgElect.sim
#./test.sh -f tsim/sync/3Replica5VgElect.sim
./test.sh -f tsim/sync/oneReplica1VgElect.sim
./test.sh -f tsim/sync/oneReplica5VgElect.sim

View File

@ -140,6 +140,8 @@ if $rows != $totalTblNum then
return -1
endi
print ====> start_switch_leader:
start_switch_leader:
$switch_loop_cnt = 0
@ -299,7 +301,7 @@ print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $da
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
print ===> $rows $data[4][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
print ===> $rows $data[4][0] $data[4][1] $data[4][2] $data[4][3] $data[4][4] $data[4][5] $data[4][6]
if $rows != 5 then
return -1
@ -341,6 +343,8 @@ print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $da
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
print ===> $rows $data[4][0] $data[4][1] $data[4][2] $data[4][3] $data[4][4] $data[4][5] $data[4][6]
if $data[0][0] != 1 then
return -1
endi
@ -358,6 +362,8 @@ if $data[3][4] != ready then
goto check_dnode_ready_2
endi
print ====> final test: create child table ctb2* and table ntb2*
sql use db;
$ctbPrefix = ctb2
$ntbPrefix = ntb2

View File

@ -542,6 +542,8 @@ if $data[3][4] != ready then
goto check_dnode_ready_2
endi
print ====> final test: create child table ctb2* and table ntb2*
sql use db;
$ctbPrefix = ctb2
$ntbPrefix = ntb2

View File

@ -0,0 +1,88 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.special_name = ['!','@','#','$','%','^','&','*','(',')','[',']','{','}',\
':',';','\'','\"',',','<','>','/','?','-','_','+','=','~']
def db_name_check(self):
dbname = tdCom.getLongName(10)
for j in self.special_name:
for i in range(len(list(dbname))+1):
new_dbname = list(dbname)
new_dbname.insert(i,j)
dbname_1 = ''.join(new_dbname)
tdSql.execute(f'create database if not exists `{dbname_1}`')
tdSql.query('show databases')
tdSql.checkEqual(tdSql.queryResult[2][0],str(dbname_1))
tdSql.execute(f'drop database `{dbname_1}`')
for i in range(len(list(dbname))+1):
new_dbname = list(dbname)
new_dbname.insert(i,'.')
dbname_1 = ''.join(new_dbname)
tdSql.error(f'create database if not exists `{dbname_1}`')
def tb_name_check(self):
dbname = tdCom.getLongName(10)
tdSql.execute(f'create database if not exists `{dbname}`')
tdSql.execute(f'use `{dbname}`')
tbname = tdCom.getLongName(5)
for i in self.special_name:
for j in range(len(list(tbname))+1):
tbname1 = list(tbname)
tbname1.insert(j,i)
new_tbname = ''.join(tbname1)
for sql in [f'`{dbname}`.`{new_tbname}`',f'`{new_tbname}`']:
tdSql.execute(f'create table {sql} (ts timestamp,c0 int)')
tdSql.execute(f'insert into {sql} values(now,1)')
tdSql.query(f'select * from {sql}')
tdSql.checkRows(1)
tdSql.execute(f'drop table {sql}')
for i in range(len(list(tbname))+1):
tbname1 = list(tbname)
tbname1.insert(i,'.')
new_tbname = ''.join(tbname1)
for sql in [f'`{dbname}`.`{new_tbname}`',f'`{new_tbname}`']:
tdSql.error(f'create table {sql} (ts timestamp,c0 int)')
tdSql.execute(f'drop database `{dbname}`')
def run(self):
self.db_name_check()
self.tb_name_check()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -49,29 +49,11 @@ class TDTestCase:
tdSql.execute(f"insert into {dbname}.ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute(f"insert into {dbname}.ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
# tdSql.execute(
# f'''insert into t1 values
# ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
# ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
# ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
# ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
# ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
# ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
# ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
# ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
# ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
# ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
# ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
# ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
# '''
# )
def restart_taosd_query_sum(self, dbname="db"):
for i in range(5):
tdLog.info(" this is %d_th restart taosd " %i)
os.system(f"taos -s ' use db ;select c6 from {dbname}.stb1 ; '")
tdLog.notice(" this is %d_th restart taosd " %i)
# os.system(f"taos -s ' use db ;select c6 from {dbname}.stb1 ; '")
tdSql.execute(f"use {dbname} ")
tdSql.query(f"select count(*) from {dbname}.stb1")
tdSql.checkRows(1)
@ -85,6 +67,25 @@ class TDTestCase:
tdDnodes.stop(1)
tdDnodes.start(1)
time.sleep(2)
tdSql.query("show databases")
status = False
while status==False:
tdSql.query("show databases")
for db_info in tdSql.queryResult:
if db_info[0]==dbname :
if db_info[15]=="ready":
status = True
tdLog.notice(" ==== database {} status is ready ==== ".format(dbname))
break
else:
status = False
else:
continue
@ -96,7 +97,7 @@ class TDTestCase:
self.prepare_datas()
os.system(f"taos -s ' select c6 from {dbname}.stb1 ; '")
# os.system(f"taos -s ' select c6 from {dbname}.stb1 ; '")
self.restart_taosd_query_sum()
def stop(self):

View File

@ -162,9 +162,9 @@ class TDTestCase:
self.checkcsum(**case6)
# case7~8: nested query
case7 = {"table_expr": "(select c1 from db.stb1 order by tbname ,ts )"}
case7 = {"table_expr": "(select c1 from db.stb1 order by ts, tbname )"}
self.checkcsum(**case7)
case8 = {"table_expr": "(select csum(c1) c1 from db.t1 partition by tbname)"}
case8 = {"table_expr": "(select csum(c1) c1 from db.t1)"}
self.checkcsum(**case8)
# case9~10: mix with tbname/ts/tag/col not support , must partition by alias ,such as select tbname ,csum(c1) partition by tbname

View File

@ -6,7 +6,7 @@ import inspect
from util.log import *
from util.sql import *
from util.cases import *
import random
import random ,math
class TDTestCase:
@ -41,8 +41,8 @@ class TDTestCase:
c2 = random.randint(0,100000)
c3 = random.randint(0,125)
c4 = random.randint(0,125)
c5 = random.random()/1.0
c6 = random.random()/1.0
c5 = random.randint(0,10000)/1000
c6 = random.randint(0,10000)/1000
c7 = "'true'"
c8 = "'binary_val'"
c9 = "'nchar_val'"
@ -72,7 +72,7 @@ class TDTestCase:
comput_irate_value = origin_result[1][0]*1000/( origin_result[1][-1] - origin_result[0][-1])
else:
comput_irate_value = (origin_result[1][0] - origin_result[0][0])*1000/( origin_result[1][-1] - origin_result[0][-1])
if comput_irate_value ==irate_value:
if abs(comput_irate_value - irate_value) <= 0.0000001:
tdLog.info(" irate work as expected , sql is %s "% irate_sql)
else:
tdLog.exit(" irate work not as expected , sql is %s "% irate_sql)

View File

@ -506,7 +506,7 @@ class TDTestCase:
#show create table
tdSql.query("show create table jsons1")
tdSql.checkData(0, 1, 'CREATE STABLE `jsons1` (`ts` TIMESTAMP, `dataint` INT, `databool` BOOL, `datastr` NCHAR(50), `datastrbin` VARCHAR(150)) TAGS (`jtag` JSON) WATERMARK 5000a, 5000a')
tdSql.checkData(0, 1, 'CREATE STABLE `jsons1` (`ts` TIMESTAMP, `dataint` INT, `databool` BOOL, `datastr` NCHAR(50), `datastrbin` VARCHAR(150)) TAGS (`jtag` JSON)')
#test aggregate function:count/avg/twa/irate/sum/stddev/leastsquares
tdSql.query("select count(*) from jsons1 where jtag is not null")

View File

@ -0,0 +1,151 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import random
import string
import os
import sys
import time
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
from util.dnodes import tdDnodes
from util.dnodes import *
import itertools
from itertools import product
from itertools import combinations
from faker import Faker
import subprocess
class TDTestCase:
def caseDescription(self):
'''
case1<xyguo>[TD-12434]:taosdump null nchar/binary length can cause core:taos-tools/src/taosdump.c
case2<xyguo>[TD-12478]:taos_stmt_execute() failed! reason: WAL size exceeds limit
'''
return
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
os.system("rm -rf 5-taos-tools/TD-12478.py.sql")
os.system("rm db*")
os.system("rm dump_result.txt*")
def restartDnodes(self):
tdDnodes.stop(1)
tdDnodes.start(1)
def dropandcreateDB_random(self,n):
self.ts = 1630000000000
fake = Faker('zh_CN')
self.num_random = fake.random_int(min=1000, max=5000, step=1)
print(self.num_random)
for i in range(n):
tdSql.execute('''drop database if exists db ;''')
tdSql.execute('''create database db keep 36500;''')
tdSql.execute('''use db;''')
tdSql.execute('''create stable stable_1 (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , \
q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) \
tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);''')
tdSql.execute('''create stable stable_2 (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , \
q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) \
tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);''')
tdSql.execute('''create table table_1 using stable_1 tags('table_1', '0' , '0' , '0' , '0' , 0 , '0' , '0' , '0' , '0' ,'0')''')
tdSql.execute('''create table table_2 using stable_1 tags('table_2', '2147483647' , '9223372036854775807' , '32767' , '127' , 1 , 'binary2' , 'nchar2' , '2' , '22' , \'1999-09-09 09:09:09.090\')''')
tdSql.execute('''create table table_3 using stable_1 tags('table_3', '-2147483647' , '-9223372036854775807' , '-32767' , '-127' , false , 'binary3' , 'nchar3nchar3' , '-3.3' , '-33.33' , \'2099-09-09 09:09:09.090\')''')
tdSql.execute('''create table table_21 using stable_2 tags('table_21' , '0' , '0' , '0' , '0' , 0 , '0' , '0' , '0' , '0' ,'0')''')
#regular table
tdSql.execute('''create table regular_table_1 \
(ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , \
q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) ;''')
tdSql.execute('''create table regular_table_2 \
(ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , \
q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) ;''')
tdSql.execute('''create table regular_table_3 \
(ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , \
q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) ;''')
for i in range(self.num_random):
tdSql.execute('''insert into table_1 (ts , q_int , q_bigint , q_smallint , q_tinyint , q_float , q_double , q_bool , q_binary , q_nchar, q_ts) values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d)'''
% (self.ts + i*1000, fake.random_int(min=-2147483647, max=2147483647, step=1),
fake.random_int(min=-9223372036854775807, max=9223372036854775807, step=1),
fake.random_int(min=-32767, max=32767, step=1) , fake.random_int(min=-127, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.address() , self.ts + i))
tdSql.execute('''insert into regular_table_1 (ts , q_int , q_bigint , q_smallint , q_tinyint , q_float , q_double, q_bool , q_binary , q_nchar, q_ts) values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d)'''
% (self.ts + i*1000, fake.random_int(min=-2147483647, max=2147483647, step=1) ,
fake.random_int(min=-9223372036854775807, max=9223372036854775807, step=1) ,
fake.random_int(min=-32767, max=32767, step=1) , fake.random_int(min=-127, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.address() , self.ts + i))
tdSql.execute('''insert into table_2 (ts , q_int , q_bigint , q_smallint , q_tinyint , q_float , q_double, q_bool , q_binary , q_nchar, q_ts) values(%d, %d, %d, %d, %d, %f, %f, 1, 'binary.%s', 'nchar.%s', %d)'''
% (self.ts + i*1000, fake.random_int(min=0, max=2147483647, step=1),
fake.random_int(min=0, max=9223372036854775807, step=1),
fake.random_int(min=0, max=32767, step=1) , fake.random_int(min=0, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.address() , self.ts + i))
tdSql.execute('''insert into regular_table_2 (ts , q_int , q_bigint , q_smallint , q_tinyint , q_float , q_double, q_bool , q_binary , q_nchar, q_ts) values(%d, %d, %d, %d, %d, %f, %f, 1, 'binary.%s', 'nchar.%s', %d)'''
% (self.ts + i*1000, fake.random_int(min=0, max=2147483647, step=1),
fake.random_int(min=0, max=9223372036854775807, step=1),
fake.random_int(min=0, max=32767, step=1) , fake.random_int(min=0, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.address() , self.ts + i))
tdSql.execute('''insert into table_3 (ts , q_int , q_bigint , q_smallint , q_tinyint , q_float , q_double, q_bool , q_binary , q_nchar, q_ts) values(%d, %d, %d, %d, %d, %f, %f, 0, 'binary.%s', 'nchar.%s', %d)'''
% (self.ts + i*1000, fake.random_int(min=-2147483647, max=0, step=1),
fake.random_int(min=-9223372036854775807, max=0, step=1),
fake.random_int(min=-32767, max=0, step=1) , fake.random_int(min=-127, max=0, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.address() , self.ts + i))
tdSql.execute('''insert into regular_table_3 (ts , q_int , q_bigint , q_smallint , q_tinyint , q_float , q_double, q_bool , q_binary , q_nchar, q_ts) values(%d, %d, %d, %d, %d, %f, %f, 1, 'binary.%s', 'nchar.%s', %d)'''
% (self.ts + i*1000, fake.random_int(min=-2147483647, max=0, step=1),
fake.random_int(min=-9223372036854775807, max=0, step=1),
fake.random_int(min=-32767, max=0, step=1) , fake.random_int(min=-127, max=0, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.address() , self.ts + i))
tdSql.query("select count(*) from stable_1;")
tdSql.checkData(0,0,3*self.num_random)
tdSql.query("select count(*) from regular_table_1;")
tdSql.checkData(0,0,self.num_random)
def run(self):
tdSql.prepare()
dcDB = self.dropandcreateDB_random(1)
assert os.system("taosdump -D db") == 0
assert os.system("taosdump -i . -g") == 0
tdSql.query("select count(*) from stable_1;")
tdSql.checkData(0,0,3*self.num_random)
tdSql.query("select count(*) from regular_table_1;")
tdSql.checkData(0,0,self.num_random)
tdSql.query("select count(*) from regular_table_2;")
tdSql.checkData(0,0,self.num_random)
tdSql.query("select count(*) from regular_table_3;")
tdSql.checkData(0,0,self.num_random)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -68,7 +68,7 @@ class TDTestCase:
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dbNumbers': 8,
'dbNumbers': 6,
'dropFlag': 1,
'event': '',
'vgroups': 2,
@ -98,10 +98,10 @@ class TDTestCase:
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.info("create mnode on dnode 2")
tdLog.info("create mnode on dnode 2")
tdSql.execute("create mnode on dnode 2")
clusterComCheck.checkMnodeStatus(2)
tdSql.info("create mnode on dnode 3")
tdLog.info("create mnode on dnode 3")
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
@ -161,7 +161,7 @@ class TDTestCase:
tdLog.info("check dnode number:")
clusterComCheck.checkDnodes(dnodeNumbers)
tdSql.query("show databases")
tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers-2))
tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers))
# tdLog.info("check DB Rows:")
# clusterComCheck.checkDbRows(allDbNumbers)
@ -172,7 +172,7 @@ class TDTestCase:
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=10,stopRole='mnode')
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=5,stopRole='mnode')
def stop(self):
tdSql.close()

View File

@ -159,7 +159,7 @@ class TDTestCase:
tdLog.info("check dnode number:")
clusterComCheck.checkDnodes(dnodeNumbers)
tdSql.query("show databases")
tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers-2))
tdLog.debug("we find %d databases but exepect to create %d databases "%(tdSql.queryRows-2,allDbNumbers))
# tdLog.info("check DB Rows:")
# clusterComCheck.checkDbRows(allDbNumbers)
@ -170,7 +170,7 @@ class TDTestCase:
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='mnode')
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=3,stopRole='mnode')
def stop(self):
tdSql.close()

View File

@ -34,7 +34,7 @@ python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 1-insert/mutil_stage.py
python3 ./test.py -f 1-insert/update_data_muti_rows.py
python3 ./test.py -f 1-insert/db_tb_name_check.py
python3 ./test.py -f 2-query/abs.py
python3 ./test.py -f 2-query/abs.py -R
@ -60,8 +60,8 @@ python3 ./test.py -f 2-query/ceil.py
python3 ./test.py -f 2-query/ceil.py -R
python3 ./test.py -f 2-query/char_length.py
python3 ./test.py -f 2-query/char_length.py -R
# python3 ./test.py -f 2-query/check_tsdb.py
# python3 ./test.py -f 2-query/check_tsdb.py -R
python3 ./test.py -f 2-query/check_tsdb.py
python3 ./test.py -f 2-query/check_tsdb.py -R
python3 ./test.py -f 2-query/concat.py
python3 ./test.py -f 2-query/concat.py -R
python3 ./test.py -f 2-query/concat_ws.py
@ -173,8 +173,9 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3