Merge branch '3.0' into feature/TD-11274-3.0
This commit is contained in:
commit
e926b914d1
|
@ -287,7 +287,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
|
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
|
||||||
|
|
||||||
#define TSDB_MIN_VNODES_PER_DB 1
|
#define TSDB_MIN_VNODES_PER_DB 1
|
||||||
#define TSDB_MAX_VNODES_PER_DB 4096
|
#define TSDB_MAX_VNODES_PER_DB 1024
|
||||||
#define TSDB_DEFAULT_VN_PER_DB 2
|
#define TSDB_DEFAULT_VN_PER_DB 2
|
||||||
#define TSDB_MIN_BUFFER_PER_VNODE 3 // unit MB
|
#define TSDB_MIN_BUFFER_PER_VNODE 3 // unit MB
|
||||||
#define TSDB_MAX_BUFFER_PER_VNODE 16384 // unit MB
|
#define TSDB_MAX_BUFFER_PER_VNODE 16384 // unit MB
|
||||||
|
|
|
@ -108,13 +108,11 @@ static const SSysDbTableSchema userFuncSchema[] = {
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema userIdxSchema[] = {
|
static const SSysDbTableSchema userIdxSchema[] = {
|
||||||
|
{.name = "index_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "index_database", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "index_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "column_name", .bytes = SYSTABLE_SCH_COL_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema userStbsSchema[] = {
|
static const SSysDbTableSchema userStbsSchema[] = {
|
||||||
|
@ -313,7 +311,7 @@ static const SSysDbTableSchema querySchema[] = {
|
||||||
{.name = "query_id", .bytes = TSDB_QUERY_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "query_id", .bytes = TSDB_QUERY_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
|
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
|
||||||
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
@ -329,17 +327,17 @@ static const SSysDbTableSchema appSchema[] = {
|
||||||
{.name = "app_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "app_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "ip", .bytes = TSDB_IPv4ADDR_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "ip", .bytes = TSDB_IPv4ADDR_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "name", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "name", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "start_time", .bytes = 8 , .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "insert_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "insert_req", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "insert_row", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "insert_row", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "insert_time", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "insert_time", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "insert_bytes", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "insert_bytes", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "fetch_bytes", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "fetch_bytes", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "query_time", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "query_time", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "show_query", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "show_query", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "total_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "total_req", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "current_req", .bytes = 8 , .type = TSDB_DATA_TYPE_UBIGINT},
|
{.name = "current_req", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||||
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -353,8 +351,7 @@ static const SSysTableMeta perfsMeta[] = {
|
||||||
{TSDB_PERFS_TABLE_TRANS, transSchema, tListLen(transSchema)},
|
{TSDB_PERFS_TABLE_TRANS, transSchema, tListLen(transSchema)},
|
||||||
{TSDB_PERFS_TABLE_SMAS, smaSchema, tListLen(smaSchema)},
|
{TSDB_PERFS_TABLE_SMAS, smaSchema, tListLen(smaSchema)},
|
||||||
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
||||||
{TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)}
|
{TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)}};
|
||||||
};
|
|
||||||
|
|
||||||
void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size) {
|
void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size) {
|
||||||
*pInfosTableMeta = infosMeta;
|
*pInfosTableMeta = infosMeta;
|
||||||
|
|
|
@ -549,18 +549,33 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
||||||
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
|
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
|
||||||
|
|
||||||
if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) {
|
if (pAlter->buffer > 0 && pAlter->buffer != pDb->cfg.buffer) {
|
||||||
|
#if 1
|
||||||
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
return terrno;
|
||||||
|
#else
|
||||||
pDb->cfg.buffer = pAlter->buffer;
|
pDb->cfg.buffer = pAlter->buffer;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) {
|
if (pAlter->pages > 0 && pAlter->pages != pDb->cfg.pages) {
|
||||||
|
#if 1
|
||||||
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
return terrno;
|
||||||
|
#else
|
||||||
pDb->cfg.pages = pAlter->pages;
|
pDb->cfg.pages = pAlter->pages;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pAlter->pageSize > 0 && pAlter->pageSize != pDb->cfg.pageSize) {
|
if (pAlter->pageSize > 0 && pAlter->pageSize != pDb->cfg.pageSize) {
|
||||||
|
#if 1
|
||||||
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
return terrno;
|
||||||
|
#else
|
||||||
pDb->cfg.pageSize = pAlter->pageSize;
|
pDb->cfg.pageSize = pAlter->pageSize;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pAlter->daysPerFile > 0 && pAlter->daysPerFile != pDb->cfg.daysPerFile) {
|
if (pAlter->daysPerFile > 0 && pAlter->daysPerFile != pDb->cfg.daysPerFile) {
|
||||||
|
@ -594,8 +609,12 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pAlter->strict >= 0 && pAlter->strict != pDb->cfg.strict) {
|
if (pAlter->strict >= 0 && pAlter->strict != pDb->cfg.strict) {
|
||||||
|
#if 1
|
||||||
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
#else
|
||||||
pDb->cfg.strict = pAlter->strict;
|
pDb->cfg.strict = pAlter->strict;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) {
|
if (pAlter->cacheLastRow >= 0 && pAlter->cacheLastRow != pDb->cfg.cacheLastRow) {
|
||||||
|
@ -604,9 +623,13 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pAlter->replications > 0 && pAlter->replications != pDb->cfg.replications) {
|
if (pAlter->replications > 0 && pAlter->replications != pDb->cfg.replications) {
|
||||||
|
#if 1
|
||||||
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
#else
|
||||||
pDb->cfg.replications = pAlter->replications;
|
pDb->cfg.replications = pAlter->replications;
|
||||||
pDb->vgVersion++;
|
pDb->vgVersion++;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
return terrno;
|
return terrno;
|
||||||
|
|
|
@ -758,6 +758,11 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (numOfVnodes > 0) {
|
||||||
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes);
|
code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
|
|
@ -71,7 +71,7 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) {
|
||||||
type = TSDB_MGMT_TABLE_FUNC;
|
type = TSDB_MGMT_TABLE_FUNC;
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) {
|
||||||
// type = TSDB_MGMT_TABLE_INDEX;
|
type = TSDB_MGMT_TABLE_INDEX;
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) {
|
||||||
type = TSDB_MGMT_TABLE_STB;
|
type = TSDB_MGMT_TABLE_STB;
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
|
||||||
|
|
|
@ -1147,29 +1147,32 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
||||||
|
|
||||||
SName smaName = {0};
|
SName smaName = {0};
|
||||||
tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&smaName));
|
||||||
|
|
||||||
char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(n, (char *)tNameGetTableName(&smaName));
|
STR_TO_VARSTR(n2, (char *)mndGetDbStr(pDb->name));
|
||||||
cols++;
|
|
||||||
|
|
||||||
SName stbName = {0};
|
SName stbName = {0};
|
||||||
tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
char n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
|
||||||
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName));
|
|
||||||
|
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)n, false);
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)n1, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)n1, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)n2, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)n3, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pSma);
|
sdbRelease(pSdb, pSma);
|
||||||
}
|
}
|
||||||
|
|
|
@ -273,6 +273,9 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
|
static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
|
||||||
|
#if 1
|
||||||
|
return TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
#else
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SSnodeObj *pObj = NULL;
|
SSnodeObj *pObj = NULL;
|
||||||
|
@ -315,6 +318,7 @@ _OVER:
|
||||||
mndReleaseSnode(pMnode, pObj);
|
mndReleaseSnode(pMnode, pObj);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
return code;
|
return code;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
|
static int32_t mndSetDropSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
|
||||||
|
@ -386,9 +390,12 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
#if 1
|
||||||
int32_t code = -1;
|
return TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
SSnodeObj *pObj = NULL;
|
#else
|
||||||
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
int32_t code = -1;
|
||||||
|
SSnodeObj *pObj = NULL;
|
||||||
SMDropSnodeReq dropReq = {0};
|
SMDropSnodeReq dropReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||||
|
@ -422,6 +429,7 @@ _OVER:
|
||||||
|
|
||||||
mndReleaseSnode(pMnode, pObj);
|
mndReleaseSnode(pMnode, pObj);
|
||||||
return code;
|
return code;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
|
|
|
@ -247,7 +247,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
|
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
|
||||||
pObj->status = 0;
|
pObj->status = 0;
|
||||||
|
|
||||||
// TODO
|
|
||||||
pObj->igExpired = pCreate->igExpired;
|
pObj->igExpired = pCreate->igExpired;
|
||||||
pObj->trigger = pCreate->triggerType;
|
pObj->trigger = pCreate->triggerType;
|
||||||
pObj->triggerParam = pCreate->maxDelay;
|
pObj->triggerParam = pCreate->maxDelay;
|
||||||
|
|
|
@ -1219,6 +1219,9 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
||||||
|
#if 1
|
||||||
|
return TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
#else
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SDnodeObj *pNew1 = NULL;
|
SDnodeObj *pNew1 = NULL;
|
||||||
SDnodeObj *pNew2 = NULL;
|
SDnodeObj *pNew2 = NULL;
|
||||||
|
@ -1412,6 +1415,7 @@ _OVER:
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
|
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
|
||||||
|
@ -1711,6 +1715,9 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
||||||
|
#if 1
|
||||||
|
return TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
#else
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SArray *pArray = NULL;
|
SArray *pArray = NULL;
|
||||||
|
@ -1759,6 +1766,7 @@ _OVER:
|
||||||
|
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
return code;
|
return code;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
|
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
|
|
@ -93,7 +93,7 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
|
||||||
|
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen);
|
||||||
ASSERT_NE(pRsp, nullptr);
|
ASSERT_NE(pRsp, nullptr);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_OPS_NOT_SUPPORT);
|
||||||
}
|
}
|
||||||
|
|
||||||
test.SendShowReq(TSDB_MGMT_TABLE_DB, "user_databases", "");
|
test.SendShowReq(TSDB_MGMT_TABLE_DB, "user_databases", "");
|
||||||
|
|
|
@ -406,193 +406,6 @@ OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|
||||||
SMqPollReq* pReq = pMsg->pCont;
|
|
||||||
int64_t consumerId = pReq->consumerId;
|
|
||||||
int64_t timeout = pReq->timeout;
|
|
||||||
int32_t reqEpoch = pReq->epoch;
|
|
||||||
int64_t fetchOffset;
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// get offset to fetch message
|
|
||||||
if (pReq->currentOffset >= 0) {
|
|
||||||
fetchOffset = pReq->currentOffset + 1;
|
|
||||||
} else {
|
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
|
|
||||||
if (pOffset != NULL) {
|
|
||||||
ASSERT(pOffset->val.type == TMQ_OFFSET__LOG);
|
|
||||||
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
|
|
||||||
TD_VID(pTq->pVnode), pOffset->val.version);
|
|
||||||
fetchOffset = pOffset->val.version + 1;
|
|
||||||
} else {
|
|
||||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
|
||||||
fetchOffset = walGetFirstVer(pTq->pWal);
|
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
|
||||||
fetchOffset = walGetCommittedVer(pTq->pWal);
|
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
|
|
||||||
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
|
||||||
pReq->subKey);
|
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
|
|
||||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %ld fetch offset %ld", consumerId,
|
|
||||||
pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
|
||||||
/*ASSERT(pHandle);*/
|
|
||||||
if (pHandle == NULL) {
|
|
||||||
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
|
||||||
pReq->subKey);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHandle->consumerId != consumerId) {
|
|
||||||
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
|
|
||||||
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
|
|
||||||
while (consumerEpoch < reqEpoch) {
|
|
||||||
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqDataBlkRsp rsp = {0};
|
|
||||||
rsp.reqOffset = pReq->currentOffset;
|
|
||||||
|
|
||||||
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
|
||||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
|
||||||
|
|
||||||
if (rsp.blockData == NULL || rsp.blockDataLen == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rsp.withTbName = pReq->withTbName;
|
|
||||||
if (rsp.withTbName) {
|
|
||||||
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
|
||||||
rsp.withSchema = false;
|
|
||||||
} else {
|
|
||||||
rsp.withSchema = true;
|
|
||||||
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 1
|
|
||||||
if (pReq->useSnapshot) {
|
|
||||||
// TODO set ver into snapshot
|
|
||||||
int64_t lastVer = walGetCommittedVer(pTq->pWal);
|
|
||||||
if (rsp.reqOffset < lastVer) {
|
|
||||||
tqInfo("retrieve using snapshot req offset %ld last ver %ld", rsp.reqOffset, lastVer);
|
|
||||||
tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId);
|
|
||||||
|
|
||||||
if (rsp.blockNum != 0) {
|
|
||||||
rsp.withTbName = false;
|
|
||||||
rsp.rspOffset = lastVer;
|
|
||||||
tqInfo("direct send by snapshot req offset %ld rsp offset %ld", rsp.reqOffset, rsp.rspOffset);
|
|
||||||
fetchOffset = lastVer;
|
|
||||||
goto SEND_RSP;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
|
|
||||||
if (pHeadWithCkSum == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
|
||||||
if (consumerEpoch > reqEpoch) {
|
|
||||||
tqWarn("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d, discard req epoch %d",
|
|
||||||
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchOffset, &pHeadWithCkSum) < 0) {
|
|
||||||
// TODO add push mgr
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SWalCont* pHead = &pHeadWithCkSum->head;
|
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
|
|
||||||
|
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
|
||||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
|
||||||
|
|
||||||
if (tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId) < 0) {
|
|
||||||
/*ASSERT(0);*/
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(pHandle->fetchMeta);
|
|
||||||
ASSERT(IS_META_MSG(pHead->msgType));
|
|
||||||
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
|
|
||||||
SMqMetaRsp metaRsp = {0};
|
|
||||||
metaRsp.reqOffset = pReq->currentOffset;
|
|
||||||
metaRsp.rspOffset = fetchOffset;
|
|
||||||
metaRsp.resMsgType = pHead->msgType;
|
|
||||||
metaRsp.metaRspLen = pHead->bodyLen;
|
|
||||||
metaRsp.metaRsp = pHead->body;
|
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
|
||||||
code = -1;
|
|
||||||
goto OVER;
|
|
||||||
}
|
|
||||||
code = 0;
|
|
||||||
goto OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO batch optimization:
|
|
||||||
// TODO continue scan until meeting batch requirement
|
|
||||||
if (rsp.blockNum > 0 /* threshold */) {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
fetchOffset++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(pHeadWithCkSum);
|
|
||||||
|
|
||||||
SEND_RSP:
|
|
||||||
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
|
||||||
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
|
||||||
if (rsp.withSchema) {
|
|
||||||
ASSERT(taosArrayGetSize(rsp.blockSchema) == rsp.blockNum);
|
|
||||||
}
|
|
||||||
|
|
||||||
rsp.rspOffset = fetchOffset;
|
|
||||||
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &rsp) < 0) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
OVER:
|
|
||||||
// TODO wrap in destroy func
|
|
||||||
taosArrayDestroy(rsp.blockDataLen);
|
|
||||||
taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree);
|
|
||||||
|
|
||||||
if (rsp.withSchema) {
|
|
||||||
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rsp.withTbName) {
|
|
||||||
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
|
||||||
|
|
|
@ -1422,7 +1422,7 @@ void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
#ifdef BUF_PAGE_DEBUG
|
#ifdef BUF_PAGE_DEBUG
|
||||||
qDebug("page_setbuf, groupId:%"PRIu64, groupId);
|
qDebug("page_setbuf, groupId:%" PRIu64, groupId);
|
||||||
#endif
|
#endif
|
||||||
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
||||||
|
|
||||||
|
@ -1570,9 +1570,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
|
|
||||||
releaseBufPage(pBuf, page);
|
releaseBufPage(pBuf, page);
|
||||||
pBlock->info.rows += pRow->numOfRows;
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
||||||
// break;
|
// break;
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
||||||
|
@ -2868,7 +2868,24 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||||
pInfo->cond.twindows[0].skey = oldSkey;
|
pInfo->cond.twindows[0].skey = oldSkey;
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
pInfo->curTWinIdx = 0;
|
pInfo->curTWinIdx = 0;
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||||
|
bool found = false;
|
||||||
|
for (int32_t i = 0; i < tableSz; i++) {
|
||||||
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
||||||
|
if (pTableInfo->uid == uid) {
|
||||||
|
found = true;
|
||||||
|
pInfo->currentTable = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO after processing drop,
|
||||||
|
ASSERT(found);
|
||||||
|
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
||||||
|
pInfo->currentTable, tableSz);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -4107,8 +4124,8 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
} else {
|
} else {
|
||||||
isNull[index++] = 0;
|
isNull[index++] = 0;
|
||||||
char* data = nodesGetValueFromNode(pValue);
|
char* data = nodesGetValueFromNode(pValue);
|
||||||
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){
|
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||||
if(tTagIsJson(data)){
|
if (tTagIsJson(data)) {
|
||||||
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||||
taosMemoryFree(keyBuf);
|
taosMemoryFree(keyBuf);
|
||||||
nodesClearList(groupNew);
|
nodesClearList(groupNew);
|
||||||
|
@ -4173,7 +4190,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
||||||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||||
if(code){
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -4202,7 +4219,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
};
|
};
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||||
if(code){
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1329,6 +1329,13 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
|
||||||
taosArrayDestroy(pInfo->scanCols);
|
taosArrayDestroy(pInfo->scanCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t getSysTableDbNameColId(const char* pTable) {
|
||||||
|
// if (0 == strcmp(TSDB_INS_TABLE_USER_INDEXES, pTable)) {
|
||||||
|
// return 1;
|
||||||
|
// }
|
||||||
|
return TSDB_INS_USER_STABLES_DBNAME_COLID;
|
||||||
|
}
|
||||||
|
|
||||||
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
|
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
ENodeType nType = nodeType(pNode);
|
ENodeType nType = nodeType(pNode);
|
||||||
|
@ -1350,7 +1357,7 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnNode* node = (SColumnNode*)pNode;
|
SColumnNode* node = (SColumnNode*)pNode;
|
||||||
if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) {
|
if (getSysTableDbNameColId(node->tableName) == node->colId) {
|
||||||
*(int32_t*)pContext = 2;
|
*(int32_t*)pContext = 2;
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,8 @@
|
||||||
|
|
||||||
## ---- db
|
## ---- db
|
||||||
./test.sh -f tsim/db/alter_option.sim
|
./test.sh -f tsim/db/alter_option.sim
|
||||||
./test.sh -f tsim/db/alter_replica_13.sim
|
# ./test.sh -f tsim/db/alter_replica_13.sim
|
||||||
./test.sh -f tsim/db/alter_replica_31.sim
|
# ./test.sh -f tsim/db/alter_replica_31.sim
|
||||||
./test.sh -f tsim/db/basic1.sim
|
./test.sh -f tsim/db/basic1.sim
|
||||||
./test.sh -f tsim/db/basic2.sim
|
./test.sh -f tsim/db/basic2.sim
|
||||||
./test.sh -f tsim/db/basic3.sim
|
./test.sh -f tsim/db/basic3.sim
|
||||||
|
@ -24,26 +24,26 @@
|
||||||
./test.sh -f tsim/db/taosdlog.sim
|
./test.sh -f tsim/db/taosdlog.sim
|
||||||
|
|
||||||
# ---- dnode
|
# ---- dnode
|
||||||
./test.sh -f tsim/dnode/balance_replica1.sim
|
# ./test.sh -f tsim/dnode/balance_replica1.sim
|
||||||
./test.sh -f tsim/dnode/balance_replica3.sim
|
# ./test.sh -f tsim/dnode/balance_replica3.sim
|
||||||
./test.sh -f tsim/dnode/balance1.sim
|
# ./test.sh -f tsim/dnode/balance1.sim
|
||||||
./test.sh -f tsim/dnode/balance2.sim
|
# ./test.sh -f tsim/dnode/balance2.sim
|
||||||
./test.sh -f tsim/dnode/balance3.sim
|
# ./test.sh -f tsim/dnode/balance3.sim
|
||||||
./test.sh -f tsim/dnode/balancex.sim
|
# ./test.sh -f tsim/dnode/balancex.sim
|
||||||
./test.sh -f tsim/dnode/create_dnode.sim
|
./test.sh -f tsim/dnode/create_dnode.sim
|
||||||
./test.sh -f tsim/dnode/drop_dnode_has_mnode.sim
|
./test.sh -f tsim/dnode/drop_dnode_has_mnode.sim
|
||||||
./test.sh -f tsim/dnode/drop_dnode_has_qnode_snode.sim
|
# ./test.sh -f tsim/dnode/drop_dnode_has_qnode_snode.sim
|
||||||
./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica1.sim
|
# ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica1.sim
|
||||||
./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim
|
# ./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim
|
||||||
./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim
|
# ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim
|
||||||
./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim
|
# ./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim
|
||||||
./test.sh -f tsim/dnode/offline_reason.sim
|
./test.sh -f tsim/dnode/offline_reason.sim
|
||||||
./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim
|
# ./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim
|
||||||
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
# ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||||
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
# ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||||
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim
|
# ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim
|
||||||
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim
|
# ./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim
|
||||||
./test.sh -f tsim/dnode/vnode_clean.sim
|
# ./test.sh -f tsim/dnode/vnode_clean.sim
|
||||||
|
|
||||||
# ---- insert
|
# ---- insert
|
||||||
./test.sh -f tsim/insert/basic0.sim
|
./test.sh -f tsim/insert/basic0.sim
|
||||||
|
@ -71,7 +71,7 @@
|
||||||
./test.sh -f tsim/qnode/basic1.sim
|
./test.sh -f tsim/qnode/basic1.sim
|
||||||
|
|
||||||
# ---- snode
|
# ---- snode
|
||||||
./test.sh -f tsim/snode/basic1.sim
|
# ./test.sh -f tsim/snode/basic1.sim
|
||||||
|
|
||||||
# ---- bnode
|
# ---- bnode
|
||||||
./test.sh -f tsim/bnode/basic1.sim
|
./test.sh -f tsim/bnode/basic1.sim
|
||||||
|
@ -104,7 +104,7 @@
|
||||||
# ./test.sh -f tsim/stream/triggerSession0.sim
|
# ./test.sh -f tsim/stream/triggerSession0.sim
|
||||||
./test.sh -f tsim/stream/partitionby.sim
|
./test.sh -f tsim/stream/partitionby.sim
|
||||||
./test.sh -f tsim/stream/partitionby1.sim
|
./test.sh -f tsim/stream/partitionby1.sim
|
||||||
./test.sh -f tsim/stream/schedSnode.sim
|
# ./test.sh -f tsim/stream/schedSnode.sim
|
||||||
./test.sh -f tsim/stream/windowClose.sim
|
./test.sh -f tsim/stream/windowClose.sim
|
||||||
./test.sh -f tsim/stream/ignoreExpiredData.sim
|
./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||||
|
|
||||||
|
@ -169,13 +169,13 @@
|
||||||
./test.sh -f tsim/valgrind/checkError.sim -v
|
./test.sh -f tsim/valgrind/checkError.sim -v
|
||||||
|
|
||||||
# --- vnode
|
# --- vnode
|
||||||
#./test.sh -f tsim/vnode/replica3_basic.sim
|
# ./test.sh -f tsim/vnode/replica3_basic.sim
|
||||||
#./test.sh -f tsim/vnode/replica3_repeat.sim
|
# ./test.sh -f tsim/vnode/replica3_repeat.sim
|
||||||
./test.sh -f tsim/vnode/replica3_vgroup.sim
|
# ./test.sh -f tsim/vnode/replica3_vgroup.sim
|
||||||
#./test.sh -f tsim/vnode/replica3_many.sim
|
# ./test.sh -f tsim/vnode/replica3_many.sim
|
||||||
#./test.sh -f tsim/vnode/replica3_import.sim
|
# ./test.sh -f tsim/vnode/replica3_import.sim
|
||||||
./test.sh -f tsim/vnode/stable_balance_replica1.sim
|
# ./test.sh -f tsim/vnode/stable_balance_replica1.sim
|
||||||
./test.sh -f tsim/vnode/stable_dnode2_stop.sim
|
# ./test.sh -f tsim/vnode/stable_dnode2_stop.sim
|
||||||
./test.sh -f tsim/vnode/stable_dnode2.sim
|
./test.sh -f tsim/vnode/stable_dnode2.sim
|
||||||
./test.sh -f tsim/vnode/stable_dnode3.sim
|
./test.sh -f tsim/vnode/stable_dnode3.sim
|
||||||
./test.sh -f tsim/vnode/stable_replica3_dnode6.sim
|
./test.sh -f tsim/vnode/stable_replica3_dnode6.sim
|
||||||
|
|
|
@ -50,6 +50,21 @@ sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) t
|
||||||
print --> create sma
|
print --> create sma
|
||||||
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
|
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
|
||||||
|
|
||||||
|
print --> show sma
|
||||||
|
sql show indexes from stb from d1;
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][0] != sma_index_name1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][1] != d1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][2] != stb then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
print --> drop stb
|
print --> drop stb
|
||||||
sql drop table stb;
|
sql drop table stb;
|
||||||
|
|
||||||
|
@ -61,6 +76,21 @@ sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) t
|
||||||
print --> create sma
|
print --> create sma
|
||||||
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
|
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(6m,10s) sliding(6m);
|
||||||
|
|
||||||
|
print --> show sma
|
||||||
|
sql show indexes from stb from d1;
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][0] != sma_index_name1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][1] != d1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][2] != stb then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
print --> drop stb
|
print --> drop stb
|
||||||
sql drop table stb;
|
sql drop table stb;
|
||||||
|
|
||||||
|
|
|
@ -16,135 +16,150 @@ import string
|
||||||
from util.log import *
|
from util.log import *
|
||||||
from util.cases import *
|
from util.cases import *
|
||||||
from util.sql import *
|
from util.sql import *
|
||||||
|
from util.sqlset import *
|
||||||
|
from util import constant
|
||||||
|
from util.common import *
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
def init(self, conn, logSql):
|
def init(self, conn, logSql):
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
|
self.setsql = TDSetSql()
|
||||||
|
self.ntbname = 'ntb'
|
||||||
|
self.stbname = 'stb'
|
||||||
|
self.binary_length = 20 # the length of binary for column_dict
|
||||||
|
self.nchar_length = 20 # the length of nchar for column_dict
|
||||||
|
self.column_dict = {
|
||||||
|
'ts' : 'timestamp',
|
||||||
|
'col1': 'tinyint',
|
||||||
|
'col2': 'smallint',
|
||||||
|
'col3': 'int',
|
||||||
|
'col4': 'bigint',
|
||||||
|
'col5': 'tinyint unsigned',
|
||||||
|
'col6': 'smallint unsigned',
|
||||||
|
'col7': 'int unsigned',
|
||||||
|
'col8': 'bigint unsigned',
|
||||||
|
'col9': 'float',
|
||||||
|
'col10': 'double',
|
||||||
|
'col11': 'bool',
|
||||||
|
'col12': f'binary({self.binary_length})',
|
||||||
|
'col13': f'nchar({self.nchar_length})'
|
||||||
|
}
|
||||||
|
self.tag_dict = {
|
||||||
|
'ts_tag' : 'timestamp',
|
||||||
|
't1': 'tinyint',
|
||||||
|
't2': 'smallint',
|
||||||
|
't3': 'int',
|
||||||
|
't4': 'bigint',
|
||||||
|
't5': 'tinyint unsigned',
|
||||||
|
't6': 'smallint unsigned',
|
||||||
|
't7': 'int unsigned',
|
||||||
|
't8': 'bigint unsigned',
|
||||||
|
't9': 'float',
|
||||||
|
't10': 'double',
|
||||||
|
't11': 'bool',
|
||||||
|
't12': f'binary({self.binary_length})',
|
||||||
|
't13': f'nchar({self.nchar_length})'
|
||||||
|
}
|
||||||
|
self.tag_list = [
|
||||||
|
f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"'
|
||||||
|
]
|
||||||
|
self.tbnum = 1
|
||||||
|
self.values_list = [
|
||||||
|
f'now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据"'
|
||||||
|
]
|
||||||
|
self.column_add_dict = {
|
||||||
|
'col_time' : 'timestamp',
|
||||||
|
'col_tinyint' : 'tinyint',
|
||||||
|
'col_smallint' : 'smallint',
|
||||||
|
'col_int' : 'int',
|
||||||
|
'col_bigint' : 'bigint',
|
||||||
|
'col_untinyint' : 'tinyint unsigned',
|
||||||
|
'col_smallint' : 'smallint unsigned',
|
||||||
|
'col_int' : 'int unsigned',
|
||||||
|
'col_bigint' : 'bigint unsigned',
|
||||||
|
'col_bool' : 'bool',
|
||||||
|
'col_float' : 'float',
|
||||||
|
'col_double' : 'double',
|
||||||
|
'col_binary' : f'binary({constant.BINARY_LENGTH_MAX})',
|
||||||
|
'col_nchar' : f'nchar({constant.NCAHR_LENGTH_MAX})'
|
||||||
|
|
||||||
def get_long_name(self, length, mode="mixed"):
|
}
|
||||||
"""
|
|
||||||
generate long name
|
|
||||||
mode could be numbers/letters/letters_mixed/mixed
|
|
||||||
"""
|
|
||||||
if mode == "numbers":
|
|
||||||
population = string.digits
|
|
||||||
elif mode == "letters":
|
|
||||||
population = string.ascii_letters.lower()
|
|
||||||
elif mode == "letters_mixed":
|
|
||||||
population = string.ascii_letters.upper() + string.ascii_letters.lower()
|
|
||||||
else:
|
|
||||||
population = string.ascii_letters.lower() + string.digits
|
|
||||||
return "".join(random.choices(population, k=length))
|
|
||||||
def alter_stable_column_check(self,dbname,stbname,tbname):
|
|
||||||
tdSql.execute(f'create database if not exists {dbname}')
|
|
||||||
tdSql.execute(f'use {dbname}')
|
|
||||||
tdSql.execute(
|
|
||||||
f'create stable {stbname} (ts timestamp, c1 tinyint, c2 smallint, c3 int, \
|
|
||||||
c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 bool,c12 binary(20),c13 nchar(20)) tags(t0 int) ')
|
|
||||||
tdSql.execute(f'create table {tbname} using {stbname} tags(1)')
|
|
||||||
tdSql.execute(f'insert into {tbname} values (now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")')
|
|
||||||
tdSql.execute(f'alter stable {stbname} add column c14 int')
|
|
||||||
tdSql.query(f'select c14 from {stbname}')
|
|
||||||
tdSql.checkRows(1)
|
|
||||||
tdSql.execute(f'alter stable {stbname} add column `c15` int')
|
|
||||||
tdSql.query(f'select c15 from {stbname}')
|
|
||||||
tdSql.checkRows(1)
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkRows(17)
|
|
||||||
tdSql.execute(f'alter stable {stbname} drop column c14')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkRows(16)
|
|
||||||
tdSql.execute(f'alter stable {stbname} drop column `c15`')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkRows(15)
|
|
||||||
tdSql.execute(f'alter stable {stbname} modify column c12 binary(30)')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkData(12,2,30)
|
|
||||||
tdSql.execute(f'alter stable {stbname} modify column `c12` binary(35)')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkData(12,2,35)
|
|
||||||
tdSql.error(f'alter stable {stbname} modify column `c12` binary(34)')
|
|
||||||
tdSql.execute(f'alter stable {stbname} modify column c13 nchar(30)')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkData(13,2,30)
|
|
||||||
tdSql.error(f'alter stable {stbname} modify column c13 nchar(29)')
|
|
||||||
tdSql.error(f'alter stable {stbname} rename column c1 c21')
|
|
||||||
tdSql.error(f'alter stable {stbname} modify column c1 int')
|
|
||||||
tdSql.error(f'alter stable {stbname} modify column c4 int')
|
|
||||||
tdSql.error(f'alter stable {stbname} modify column c8 int')
|
|
||||||
tdSql.error(f'alter stable {stbname} modify column c1 unsigned int')
|
|
||||||
tdSql.error(f'alter stable {stbname} modify column c9 double')
|
|
||||||
tdSql.error(f'alter stable {stbname} modify column c10 float')
|
|
||||||
tdSql.error(f'alter stable {stbname} modify column c11 int')
|
|
||||||
tdSql.error(f'alter stable {stbname} drop tag t0')
|
|
||||||
tdSql.execute(f'drop database {dbname}')
|
|
||||||
|
|
||||||
def alter_stable_tag_check(self,dbname,stbname,tbname):
|
|
||||||
tdSql.execute(f'create database if not exists {dbname}')
|
|
||||||
tdSql.execute(f'use {dbname}')
|
|
||||||
tdSql.execute(
|
|
||||||
f'create stable {stbname} (ts timestamp, c1 int) tags(ts_tag timestamp, t1 tinyint, t2 smallint, t3 int, \
|
|
||||||
t4 bigint, t5 tinyint unsigned, t6 smallint unsigned, t7 int unsigned, t8 bigint unsigned, t9 float, t10 double, t11 bool,t12 binary(20),t13 nchar(20)) ')
|
|
||||||
tdSql.execute(f'create table {tbname} using {stbname} tags(now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")')
|
|
||||||
tdSql.execute(f'insert into {tbname} values(now,1)')
|
|
||||||
|
|
||||||
tdSql.execute(f'alter stable {stbname} add tag t14 int')
|
|
||||||
tdSql.query(f'select t14 from {stbname}')
|
|
||||||
tdSql.checkRows(1)
|
|
||||||
tdSql.execute(f'alter stable {stbname} add tag `t15` int')
|
|
||||||
tdSql.query(f'select t14 from {stbname}')
|
|
||||||
tdSql.checkRows(1)
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkRows(18)
|
|
||||||
tdSql.execute(f'alter stable {stbname} drop tag t14')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkRows(17)
|
|
||||||
tdSql.execute(f'alter stable {stbname} drop tag `t15`')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkRows(16)
|
|
||||||
tdSql.execute(f'alter stable {stbname} modify tag t12 binary(30)')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkData(14,2,30)
|
|
||||||
tdSql.execute(f'alter stable {stbname} modify tag `t12` binary(35)')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkData(14,2,35)
|
|
||||||
tdSql.error(f'alter stable {stbname} modify tag `t12` binary(34)')
|
|
||||||
tdSql.execute(f'alter stable {stbname} modify tag t13 nchar(30)')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkData(15,2,30)
|
|
||||||
tdSql.error(f'alter stable {stbname} modify tag t13 nchar(29)')
|
|
||||||
tdSql.execute(f'alter table {stbname} rename tag t1 t21')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkData(3,0,'t21')
|
|
||||||
tdSql.execute(f'alter table {stbname} rename tag `t21` t1')
|
|
||||||
tdSql.query(f'describe {stbname}')
|
|
||||||
tdSql.checkData(3,0,'t1')
|
|
||||||
|
|
||||||
for i in ['bigint','unsigned int','float','double','binary(10)','nchar(10)']:
|
|
||||||
for j in [1,2,3]:
|
|
||||||
tdSql.error(f'alter stable {stbname} modify tag t{j} {i}')
|
|
||||||
for i in ['int','unsigned int','float','binary(10)','nchar(10)']:
|
|
||||||
tdSql.error(f'alter stable {stbname} modify tag t8 {i}')
|
|
||||||
tdSql.error(f'alter stable {stbname} modify tag t4 int')
|
|
||||||
tdSql.error(f'alter stable {stbname} drop column t0')
|
|
||||||
#!bug TD-16410
|
|
||||||
# tdSql.error(f'alter stable {tbname} set tag t1=100 ')
|
|
||||||
# tdSql.execute(f'create table ntb (ts timestamp,c0 int)')
|
|
||||||
tdSql.error(f'alter stable ntb add column c2 ')
|
|
||||||
tdSql.execute(f'drop database {dbname}')
|
|
||||||
|
|
||||||
|
def alter_stable_check(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
|
||||||
|
tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict))
|
||||||
|
for i in self.values_list:
|
||||||
|
tdSql.execute(f'insert into {self.ntbname} values({i})')
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
||||||
|
for j in self.values_list:
|
||||||
|
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
||||||
|
for key,values in self.column_add_dict.items():
|
||||||
|
tdSql.execute(f'alter stable {self.stbname} add column {key} {values}')
|
||||||
|
tdSql.query(f'describe {self.stbname}')
|
||||||
|
tdSql.checkRows(len(self.column_dict)+len(self.tag_dict)+1)
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.query(f'describe {self.stbname}_{i}')
|
||||||
|
tdSql.checkRows(len(self.column_dict)+len(self.tag_dict)+1)
|
||||||
|
tdSql.query(f'select {key} from {self.stbname}_{i}')
|
||||||
|
tdSql.checkRows(len(self.values_list))
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.error(f'alter stable {self.stbname}_{i} add column {key} {values}')
|
||||||
|
tdSql.error(f'alter stable {self.stbname}_{i} drop column {key}')
|
||||||
|
#! bug TD-16921
|
||||||
|
#tdSql.error(f'alter stable {self.ntbname} add column {key} {values}')
|
||||||
|
#tdSql.error(f'alter stable {self.ntbname} drop column {key}')
|
||||||
|
tdSql.execute(f'alter stable {self.stbname} drop column {key}')
|
||||||
|
tdSql.query(f'describe {self.stbname}')
|
||||||
|
tdSql.checkRows(len(self.column_dict)+len(self.tag_dict))
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.query(f'describe {self.stbname}_{i}')
|
||||||
|
tdSql.checkRows(len(self.column_dict)+len(self.tag_dict))
|
||||||
|
tdSql.error(f'select {key} from {self.stbname} ')
|
||||||
|
for key,values in self.column_dict.items():
|
||||||
|
if 'binary' in values.lower():
|
||||||
|
v = f'binary({self.binary_length+1})'
|
||||||
|
v_error = f'binary({self.binary_length-1})'
|
||||||
|
tdSql.error(f'alter stable {self.stbname} modify column {key} {v_error}')
|
||||||
|
tdSql.execute(f'alter stable {self.stbname} modify column {key} {v}')
|
||||||
|
tdSql.query(f'describe {self.stbname}')
|
||||||
|
result = tdCom.getOneRow(1,'VARCHAR')
|
||||||
|
tdSql.checkEqual(result[0][2],self.binary_length+1)
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.query(f'describe {self.stbname}_{i}')
|
||||||
|
result = tdCom.getOneRow(1,'VARCHAR')
|
||||||
|
tdSql.checkEqual(result[0][2],self.binary_length+1)
|
||||||
|
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
|
||||||
|
#! bug TD-16921
|
||||||
|
# tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
|
||||||
|
elif 'nchar' in values.lower():
|
||||||
|
v = f'nchar({self.binary_length+1})'
|
||||||
|
v_error = f'nchar({self.binary_length-1})'
|
||||||
|
tdSql.error(f'alter stable {self.stbname} modify column {key} {v_error}')
|
||||||
|
tdSql.execute(f'alter stable {self.stbname} modify column {key} {v}')
|
||||||
|
tdSql.query(f'describe {self.stbname}')
|
||||||
|
result = tdCom.getOneRow(1,'NCHAR')
|
||||||
|
tdSql.checkEqual(result[0][2],self.binary_length+1)
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.query(f'describe {self.stbname}_{i}')
|
||||||
|
result = tdCom.getOneRow(1,'NCHAR')
|
||||||
|
tdSql.checkEqual(result[0][2],self.binary_length+1)
|
||||||
|
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
|
||||||
|
#! bug TD-16921
|
||||||
|
#tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
|
||||||
|
else:
|
||||||
|
for v in self.column_dict.values():
|
||||||
|
tdSql.error(f'alter stable {self.stbname} modify column {key} {v}')
|
||||||
|
# tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
||||||
dbname = self.get_long_name(length=10, mode="letters")
|
self.alter_stable_check()
|
||||||
stbname = self.get_long_name(length=5, mode="letters")
|
|
||||||
tbname = self.get_long_name(length=5, mode="letters")
|
|
||||||
self.alter_stable_column_check(dbname,stbname,tbname)
|
|
||||||
self.alter_stable_tag_check(dbname,stbname,tbname)
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success("%s successfully executed" % __file__)
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
|
|
@ -126,6 +126,7 @@ class TDTestCase:
|
||||||
tdSql.execute(f'alter table {self.ntbname} rename column {key} {rename_str}')
|
tdSql.execute(f'alter table {self.ntbname} rename column {key} {rename_str}')
|
||||||
tdSql.query(f'select {rename_str} from {self.ntbname}')
|
tdSql.query(f'select {rename_str} from {self.ntbname}')
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
|
tdSql.error(f'select {key} from {self.ntbname}')
|
||||||
|
|
||||||
def alter_check_tb(self):
|
def alter_check_tb(self):
|
||||||
tag_tinyint = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX)
|
tag_tinyint = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX)
|
||||||
|
@ -277,7 +278,11 @@ class TDTestCase:
|
||||||
else:
|
else:
|
||||||
for v in self.column_dict.values():
|
for v in self.column_dict.values():
|
||||||
tdSql.error(f'alter table {self.stbname} modify column {key} {v}')
|
tdSql.error(f'alter table {self.stbname} modify column {key} {v}')
|
||||||
|
for key,values in self.column_dict.items():
|
||||||
|
rename_str = f'{tdCom.getLongName(constant.COL_NAME_LENGTH_MAX,"letters")}'
|
||||||
|
tdSql.error(f'alter table {self.stbname} rename column {key} {rename_str}')
|
||||||
|
for i in range(self.tbnum):
|
||||||
|
tdSql.error(f'alter table {self.stbname}_{i} rename column {key} {rename_str}')
|
||||||
def run(self):
|
def run(self):
|
||||||
self.alter_check_ntb()
|
self.alter_check_ntb()
|
||||||
self.alter_check_tb()
|
self.alter_check_tb()
|
||||||
|
|
|
@ -40,6 +40,7 @@ class TDTestCase:
|
||||||
self.time_unit = ['b','u','a','s','m','h','d','w']
|
self.time_unit = ['b','u','a','s','m','h','d','w']
|
||||||
self.symbol = ['+','-','*','/']
|
self.symbol = ['+','-','*','/']
|
||||||
self.error_values = [1.5,'abc','"abc"','!@','today()']
|
self.error_values = [1.5,'abc','"abc"','!@','today()']
|
||||||
|
self.db_percision = ['ms','us','ns']
|
||||||
def tbtype_check(self,tb_type):
|
def tbtype_check(self,tb_type):
|
||||||
if tb_type == 'normal table' or tb_type == 'child table':
|
if tb_type == 'normal table' or tb_type == 'child table':
|
||||||
tdSql.checkRows(len(self.values_list))
|
tdSql.checkRows(len(self.values_list))
|
||||||
|
@ -70,23 +71,29 @@ class TDTestCase:
|
||||||
tdSql.checkData(i,0,None)
|
tdSql.checkData(i,0,None)
|
||||||
|
|
||||||
def now_check_ntb(self):
|
def now_check_ntb(self):
|
||||||
tdSql.prepare()
|
for time_unit in self.db_percision:
|
||||||
tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict))
|
tdSql.execute(f'create database db precision "{time_unit}"')
|
||||||
for value in self.values_list:
|
tdSql.execute('use db')
|
||||||
tdSql.execute(
|
tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict))
|
||||||
f'insert into {self.ntbname} values({value})')
|
for value in self.values_list:
|
||||||
self.data_check(self.ntbname,'normal table')
|
tdSql.execute(
|
||||||
|
f'insert into {self.ntbname} values({value})')
|
||||||
|
self.data_check(self.ntbname,'normal table')
|
||||||
|
tdSql.execute('drop database db')
|
||||||
|
|
||||||
def now_check_stb(self):
|
def now_check_stb(self):
|
||||||
tdSql.prepare()
|
for time_unit in self.db_percision:
|
||||||
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
|
tdSql.execute(f'create database db precision "{time_unit}"')
|
||||||
for i in range(self.tbnum):
|
tdSql.execute('use db')
|
||||||
tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})")
|
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
|
||||||
for value in self.values_list:
|
for i in range(self.tbnum):
|
||||||
tdSql.execute(f'insert into {self.stbname}_{i} values({value})')
|
tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})")
|
||||||
for i in range(self.tbnum):
|
for value in self.values_list:
|
||||||
self.data_check(f'{self.stbname}_{i}','child table')
|
tdSql.execute(f'insert into {self.stbname}_{i} values({value})')
|
||||||
self.data_check(self.stbname,'stable')
|
for i in range(self.tbnum):
|
||||||
|
self.data_check(f'{self.stbname}_{i}','child table')
|
||||||
|
self.data_check(self.stbname,'stable')
|
||||||
|
tdSql.execute('drop database db')
|
||||||
def run(self): # sourcery skip: extract-duplicate-method
|
def run(self): # sourcery skip: extract-duplicate-method
|
||||||
|
|
||||||
self.now_check_ntb()
|
self.now_check_ntb()
|
||||||
|
|
|
@ -5,6 +5,7 @@ from util.log import *
|
||||||
from util.sql import *
|
from util.sql import *
|
||||||
from util.cases import *
|
from util.cases import *
|
||||||
import datetime
|
import datetime
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
|
@ -12,8 +13,8 @@ class TDTestCase:
|
||||||
def init(self, conn, logSql):
|
def init(self, conn, logSql):
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
self.today_date = datetime.datetime.strptime(
|
self.today_date = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d")
|
||||||
datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d")
|
self.today_ts = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d").timestamp()
|
||||||
self.time_unit = ['b','u','a','s','m','h','d','w']
|
self.time_unit = ['b','u','a','s','m','h','d','w']
|
||||||
self.error_param = ['1.5','abc','!@#','"abc"','today()']
|
self.error_param = ['1.5','abc','!@#','"abc"','today()']
|
||||||
self.arithmetic_operators = ['+','-','*','/']
|
self.arithmetic_operators = ['+','-','*','/']
|
||||||
|
@ -41,7 +42,7 @@ class TDTestCase:
|
||||||
f'today(),3,3.333,333.333333,now()',
|
f'today(),3,3.333,333.333333,now()',
|
||||||
f'today()-1d,10,11.11,99.999999,now()',
|
f'today()-1d,10,11.11,99.999999,now()',
|
||||||
f'today()+1d,1,1.55,100.555555,today()']
|
f'today()+1d,1,1.55,100.555555,today()']
|
||||||
|
self.db_percision = ['ms','us','ns']
|
||||||
def set_create_normaltable_sql(self, ntbname, column_dict):
|
def set_create_normaltable_sql(self, ntbname, column_dict):
|
||||||
column_sql = ''
|
column_sql = ''
|
||||||
for k, v in column_dict.items():
|
for k, v in column_dict.items():
|
||||||
|
@ -57,7 +58,8 @@ class TDTestCase:
|
||||||
tag_sql += f"{k} {v},"
|
tag_sql += f"{k} {v},"
|
||||||
create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})'
|
create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})'
|
||||||
return create_stb_sql
|
return create_stb_sql
|
||||||
def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb'):
|
|
||||||
|
def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb',precision = 'ms'):
|
||||||
for k,v in column_dict.items():
|
for k,v in column_dict.items():
|
||||||
num_up = 0
|
num_up = 0
|
||||||
num_down = 0
|
num_down = 0
|
||||||
|
@ -65,12 +67,27 @@ class TDTestCase:
|
||||||
if v.lower() == 'timestamp':
|
if v.lower() == 'timestamp':
|
||||||
tdSql.query(f'select {k} from {tbname}')
|
tdSql.query(f'select {k} from {tbname}')
|
||||||
for i in tdSql.queryResult:
|
for i in tdSql.queryResult:
|
||||||
if i[0] > self.today_date:
|
if precision == 'ms':
|
||||||
num_up += 1
|
if int(i[0].timestamp())*1000 > int(self.today_ts)*1000:
|
||||||
elif i[0] == self.today_date:
|
num_up += 1
|
||||||
num_same += 1
|
elif int(i[0].timestamp())*1000 == int(self.today_ts)*1000:
|
||||||
elif i[0] < self.today_date:
|
num_same += 1
|
||||||
num_down += 1
|
elif int(i[0].timestamp())*1000 < int(self.today_ts)*1000:
|
||||||
|
num_down += 1
|
||||||
|
elif precision == 'us':
|
||||||
|
if int(i[0].timestamp())*1000000 > int(self.today_ts)*1000000:
|
||||||
|
num_up += 1
|
||||||
|
elif int(i[0].timestamp())*1000000 == int(self.today_ts)*1000000:
|
||||||
|
num_same += 1
|
||||||
|
elif int(i[0].timestamp())*1000000 < int(self.today_ts)*1000000:
|
||||||
|
num_down += 1
|
||||||
|
elif precision == 'ns':
|
||||||
|
if i[0] > int(self.today_ts)*1000000000:
|
||||||
|
num_up += 1
|
||||||
|
elif i[0] == int(self.today_ts)*1000000000:
|
||||||
|
num_same += 1
|
||||||
|
elif i[0] < int(self.today_ts)*1000000000:
|
||||||
|
num_down += 1
|
||||||
tdSql.query(f"select today() from {tbname}")
|
tdSql.query(f"select today() from {tbname}")
|
||||||
tdSql.checkRows(len(values_list)*tb_num)
|
tdSql.checkRows(len(values_list)*tb_num)
|
||||||
tdSql.checkData(0, 0, str(self.today_date))
|
tdSql.checkData(0, 0, str(self.today_date))
|
||||||
|
@ -130,32 +147,36 @@ class TDTestCase:
|
||||||
for i in range(num_same):
|
for i in range(num_same):
|
||||||
tdSql.checkData(i, 0, str(self.today_date))
|
tdSql.checkData(i, 0, str(self.today_date))
|
||||||
def today_check_ntb(self):
|
def today_check_ntb(self):
|
||||||
tdSql.prepare()
|
for time_unit in self.db_percision:
|
||||||
tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict))
|
print(time_unit)
|
||||||
for i in self.values_list:
|
tdSql.execute(f'create database db precision "{time_unit}"')
|
||||||
tdSql.execute(
|
tdSql.execute('use db')
|
||||||
f'insert into {self.ntbname} values({i})')
|
tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict))
|
||||||
self.data_check(self.column_dict,self.ntbname,self.values_list)
|
for i in self.values_list:
|
||||||
tdSql.execute('drop database db')
|
tdSql.execute(
|
||||||
|
f'insert into {self.ntbname} values({i})')
|
||||||
|
self.data_check(self.column_dict,self.ntbname,self.values_list,1,'tb',time_unit)
|
||||||
|
tdSql.execute('drop database db')
|
||||||
def today_check_stb_tb(self):
|
def today_check_stb_tb(self):
|
||||||
tdSql.prepare()
|
for time_unit in self.db_percision:
|
||||||
tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
|
print(time_unit)
|
||||||
for i in range(self.tbnum):
|
tdSql.execute(f'create database db precision "{time_unit}"')
|
||||||
tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})')
|
tdSql.execute('use db')
|
||||||
for j in self.values_list:
|
tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
|
||||||
tdSql.execute(f'insert into {self.stbname}_{i} values ({j})')
|
for i in range(self.tbnum):
|
||||||
# check child table
|
tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})')
|
||||||
for i in range(self.tbnum):
|
for j in self.values_list:
|
||||||
self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list)
|
tdSql.execute(f'insert into {self.stbname}_{i} values ({j})')
|
||||||
# check stable
|
# check child table
|
||||||
self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb')
|
for i in range(self.tbnum):
|
||||||
tdSql.execute('drop database db')
|
self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list,1,'tb',time_unit)
|
||||||
|
# check stable
|
||||||
|
self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb',time_unit)
|
||||||
|
tdSql.execute('drop database db')
|
||||||
|
|
||||||
def run(self): # sourcery skip: extract-duplicate-method
|
def run(self): # sourcery skip: extract-duplicate-method
|
||||||
|
|
||||||
tdLog.printNoPrefix("==========check today() for normal table ==========")
|
|
||||||
self.today_check_ntb()
|
self.today_check_ntb()
|
||||||
tdLog.printNoPrefix("==========check today() for stable and child table==========")
|
|
||||||
self.today_check_stb_tb()
|
self.today_check_stb_tb()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
|
@ -0,0 +1,242 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import math
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def __init__(self):
|
||||||
|
self.vgroups = 4
|
||||||
|
self.ctbNum = 10
|
||||||
|
self.rowsPerTbl = 10000
|
||||||
|
|
||||||
|
def init(self, conn, logSql):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
def prepareTestEnv(self):
|
||||||
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 3,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 1}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||||
|
tdLog.info("create stb")
|
||||||
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
|
tdLog.info("create ctb")
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
tdLog.info("insert data")
|
||||||
|
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||||
|
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||||
|
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
|
||||||
|
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||||
|
tdDnodes.stop(1)
|
||||||
|
# tdDnodes.start(1)
|
||||||
|
tdDnodes.starttaosd(1)
|
||||||
|
return
|
||||||
|
|
||||||
|
def tmqCase1(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 3,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 1}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
topicNameList = ['topic1']
|
||||||
|
expectRowsList = []
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb with filter")
|
||||||
|
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||||
|
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
tdSql.query(queryString)
|
||||||
|
expectRowsList.append(tdSql.getRows())
|
||||||
|
|
||||||
|
# init consume info, and start tmq_sim, then check consume result
|
||||||
|
tdLog.info("insert consume info to consume processor")
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||||
|
topicList = topicNameList[0]
|
||||||
|
ifcheckdata = 1
|
||||||
|
ifManualCommit = 1
|
||||||
|
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
tdLog.info("wait the consume result")
|
||||||
|
|
||||||
|
expectRows = 1
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
|
if expectRowsList[0] != resultList[0]:
|
||||||
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||||
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
for i in range(len(topicNameList)):
|
||||||
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase2(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 2: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 3,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 1}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
topicNameList = ['topic1']
|
||||||
|
expectRowsList = []
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb with filter")
|
||||||
|
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||||
|
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
tdSql.query(queryString)
|
||||||
|
expectRowsList.append(tdSql.getRows())
|
||||||
|
totalRowsInserted = expectRowsList[0]
|
||||||
|
|
||||||
|
# init consume info, and start tmq_sim, then check consume result
|
||||||
|
tdLog.info("insert consume info to consume processor")
|
||||||
|
consumerId = 1
|
||||||
|
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
|
||||||
|
topicList = topicNameList[0]
|
||||||
|
ifcheckdata = 1
|
||||||
|
ifManualCommit = 1
|
||||||
|
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor 0")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
tdLog.info("wait the consume result")
|
||||||
|
|
||||||
|
expectRows = 1
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
|
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
|
||||||
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
firstConsumeRows = resultList[0]
|
||||||
|
|
||||||
|
# reinit consume info, and start tmq_sim, then check consume result
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
consumerId = 2
|
||||||
|
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor 1")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
tdLog.info("wait the consume result")
|
||||||
|
|
||||||
|
expectRows = 1
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
|
actConsumeTotalRows = firstConsumeRows + resultList[0]
|
||||||
|
|
||||||
|
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
|
||||||
|
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
|
||||||
|
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
|
||||||
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
for i in range(len(topicNameList)):
|
||||||
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
self.prepareTestEnv()
|
||||||
|
self.tmqCase1()
|
||||||
|
self.tmqCase2()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -116,8 +116,8 @@ python3 ./test.py -f 2-query/irate.py
|
||||||
python3 ./test.py -f 2-query/function_null.py
|
python3 ./test.py -f 2-query/function_null.py
|
||||||
python3 ./test.py -f 2-query/queryQnode.py
|
python3 ./test.py -f 2-query/queryQnode.py
|
||||||
|
|
||||||
python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
#python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||||
python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
|
#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
|
||||||
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
|
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
|
||||||
#python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
|
#python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
|
||||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
|
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
|
||||||
|
@ -157,3 +157,4 @@ python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
||||||
python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
|
python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
|
||||||
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py
|
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg.py
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 5fdd694621fbb7bd2d6102ff4feaec92a7001038
|
Subproject commit 7105027650b51e701cfa1dac11b8fb42d447dd01
|
Loading…
Reference in New Issue