Merge branch '3.0' into feature/3_liaohj

This commit is contained in:
Haojun Liao 2022-07-13 16:41:26 +08:00
commit dfb0511e17
34 changed files with 2581 additions and 2239 deletions

View File

@ -223,8 +223,8 @@ typedef struct SRequestObj {
SArray* tableList; SArray* tableList;
SQueryExecMetric metric; SQueryExecMetric metric;
SRequestSendRecvBody body; SRequestSendRecvBody body;
bool stableQuery; // todo refactor bool stableQuery; // todo refactor
bool validateOnly; // todo refactor bool validateOnly; // todo refactor
bool killed; bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
@ -325,7 +325,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest); int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
SRequestObj** pRequest);
void taos_close_internal(void* taos); void taos_close_internal(void* taos);
@ -359,9 +360,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
bool qnodeRequired(SRequestObj* pRequest); bool qnodeRequired(SRequestObj* pRequest);
void initTscQhandle();
void cleanupTscQhandle();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -35,22 +35,10 @@ SAppInfo appInfo;
int32_t clientReqRefPool = -1; int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1; int32_t clientConnRefPool = -1;
void *tscQhandle = NULL;
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0; volatile int32_t tscInitRes = 0;
void initTscQhandle() { static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
// init handle
tscQhandle = taosInitScheduler(4096, 5, "tsc");
}
void cleanupTscQhandle() {
// destroy handle
taosCleanUpScheduler(tscQhandle);
}
static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
// connection has been released already, abort creating request. // connection has been released already, abort creating request.
pRequest->self = taosAddRef(clientReqRefPool, pRequest); pRequest->self = taosAddRef(clientReqRefPool, pRequest);
@ -72,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
static void deregisterRequest(SRequestObj *pRequest) { static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
STscObj * pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
@ -97,7 +85,8 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
static bool clientRpcRfp(int32_t code, tmsg_t msgType) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (NEED_REDIRECT_ERROR(code)) { if (NEED_REDIRECT_ERROR(code)) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false; return false;
} }
return true; return true;
@ -251,7 +240,7 @@ void *createRequest(uint64_t connId, int32_t type) {
return NULL; return NULL;
} }
STscObj* pTscObj = acquireTscObj(connId); STscObj *pTscObj = acquireTscObj(connId);
if (pTscObj == NULL) { if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED; terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL; return NULL;
@ -348,7 +337,6 @@ void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet. // In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup); atexit(taos_cleanup);
initTscQhandle();
errno = TSDB_CODE_SUCCESS; errno = TSDB_CODE_SUCCESS;
taosSeedRand(taosGetTimestampSec()); taosSeedRand(taosGetTimestampSec());
@ -407,7 +395,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return 0; return 0;
} }
SConfig * pCfg = taosGetCfg(); SConfig *pCfg = taosGetCfg();
SConfigItem *pItem = NULL; SConfigItem *pItem = NULL;
switch (option) { switch (option) {

View File

@ -1274,8 +1274,8 @@ typedef struct SchedArg {
SEpSet* pEpset; SEpSet* pEpset;
} SchedArg; } SchedArg;
void doProcessMsgFromServer(SSchedMsg* schedMsg) { int32_t doProcessMsgFromServer(void* param) {
SchedArg* arg = (SchedArg*)schedMsg->ahandle; SchedArg* arg = (SchedArg*)param;
SRpcMsg* pMsg = &arg->msg; SRpcMsg* pMsg = &arg->msg;
SEpSet* pEpSet = arg->pEpset; SEpSet* pEpSet = arg->pEpset;
@ -1328,11 +1328,10 @@ void doProcessMsgFromServer(SSchedMsg* schedMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo); destroySendMsgInfo(pSendInfo);
taosMemoryFree(arg); taosMemoryFree(arg);
return TSDB_CODE_SUCCESS;
} }
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SSchedMsg schedMsg = {0};
SEpSet* tEpSet = NULL; SEpSet* tEpSet = NULL;
if (pEpSet != NULL) { if (pEpSet != NULL) {
tEpSet = taosMemoryCalloc(1, sizeof(SEpSet)); tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
@ -1343,9 +1342,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
arg->msg = *pMsg; arg->msg = *pMsg;
arg->pEpset = tEpSet; arg->pEpset = tEpSet;
schedMsg.fp = doProcessMsgFromServer; taosAsyncExec(doProcessMsgFromServer, arg, NULL);
schedMsg.ahandle = arg;
taosScheduleTask(tscQhandle, &schedMsg);
} }
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {

View File

@ -72,7 +72,6 @@ void taos_cleanup(void) {
catalogDestroy(); catalogDestroy();
schedulerDestroy(); schedulerDestroy();
cleanupTscQhandle();
rpcCleanup(); rpcCleanup();
tscInfo("all local resources released"); tscInfo("all local resources released");
taosCleanupCfg(); taosCleanupCfg();
@ -242,7 +241,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
#endif #endif
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res)) {
SMqRspObj * msg = ((SMqRspObj *)res); SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo; SReqResultInfo *pResultInfo;
if (msg->resIter == -1) { if (msg->resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true); pResultInfo = tmqGetNextResInfo(res, true);
@ -418,7 +417,7 @@ int taos_affected_rows(TAOS_RES *res) {
return 0; return 0;
} }
SRequestObj * pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
SReqResultInfo *pResInfo = &pRequest->body.resInfo; SReqResultInfo *pResInfo = &pRequest->body.resInfo;
return pResInfo->numOfRows; return pResInfo->numOfRows;
} }
@ -601,7 +600,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
} }
SReqResultInfo *pResInfo = tscGetCurResInfo(res); SReqResultInfo *pResInfo = tscGetCurResInfo(res);
TAOS_FIELD * pField = &pResInfo->userFields[columnIndex]; TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
if (!IS_VAR_DATA_TYPE(pField->type)) { if (!IS_VAR_DATA_TYPE(pField->type)) {
return 0; return 0;
} }
@ -645,8 +644,8 @@ const char *taos_get_server_info(TAOS *taos) {
typedef struct SqlParseWrapper { typedef struct SqlParseWrapper {
SParseContext *pCtx; SParseContext *pCtx;
SCatalogReq catalogReq; SCatalogReq catalogReq;
SRequestObj * pRequest; SRequestObj *pRequest;
SQuery * pQuery; SQuery *pQuery;
} SqlParseWrapper; } SqlParseWrapper;
static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
@ -665,8 +664,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery * pQuery = pWrapper->pQuery; SQuery *pQuery = pWrapper->pQuery;
SRequestObj * pRequest = pWrapper->pRequest; SRequestObj *pRequest = pWrapper->pRequest;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
@ -684,7 +683,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
destorySqlParseWrapper(pWrapper); destorySqlParseWrapper(pWrapper);
tscDebug("0x%"PRIx64" analysis semantics completed, start async query, reqId:0x%"PRIx64, pRequest->self, pRequest->requestId); tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
pRequest->requestId);
launchAsyncQuery(pRequest, pQuery, pResultMeta); launchAsyncQuery(pRequest, pQuery, pResultMeta);
} else { } else {
destorySqlParseWrapper(pWrapper); destorySqlParseWrapper(pWrapper);
@ -705,7 +705,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
} }
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
int64_t connId = *(int64_t*)taos; int64_t connId = *(int64_t *)taos;
taosAsyncQueryImpl(connId, sql, fp, param, false); taosAsyncQueryImpl(connId, sql, fp, param, false);
} }
@ -739,7 +739,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SParseContext *pCxt = NULL; SParseContext *pCxt = NULL;
STscObj * pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
int32_t code = 0; int32_t code = 0;
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
@ -874,9 +874,9 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
SSchedulerReq req = { SSchedulerReq req = {
.syncReq = false, .syncReq = false,
.fetchFp = fetchCallback, .fetchFp = fetchCallback,
.cbParam = pRequest, .cbParam = pRequest,
}; };
schedulerFetchRows(pRequest->body.queryJob, &req); schedulerFetchRows(pRequest->body.queryJob, &req);
@ -886,7 +886,7 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL); ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res)); ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res; SRequestObj *pRequest = res;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed // set the current block is all consumed
@ -928,7 +928,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
int64_t connId = *(int64_t *)taos; int64_t connId = *(int64_t *)taos;
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
int32_t code = 0; int32_t code = 0;
SRequestObj * pRequest = NULL; SRequestObj *pRequest = NULL;
SCatalogReq catalogReq = {0}; SCatalogReq catalogReq = {0};
if (NULL == tableNameList) { if (NULL == tableNameList) {
@ -950,7 +950,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return; goto _return;
} }
STscObj* pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta); code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
if (code) { if (code) {
goto _return; goto _return;
@ -972,7 +972,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return; goto _return;
} }
SSyncQueryParam* pParam = pRequest->body.param; SSyncQueryParam *pParam = pRequest->body.param;
tsem_wait(&pParam->sem); tsem_wait(&pParam->sem);
_return: _return:

View File

@ -179,7 +179,6 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
taosMemoryFreeClear(output.dbVgroup);
tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr()); tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
} else if (output.dbVgroup && output.dbVgroup->vgHash) { } else if (output.dbVgroup && output.dbVgroup->vgHash) {
@ -189,12 +188,14 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (code1 != TSDB_CODE_SUCCESS) { if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code1)); tstrerror(code1));
taosMemoryFreeClear(output.dbVgroup);
} else { } else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
output.dbVgroup = NULL;
} }
} }
taosMemoryFreeClear(output.dbVgroup);
tFreeSUsedbRsp(&usedbRsp); tFreeSUsedbRsp(&usedbRsp);
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};

View File

@ -56,20 +56,22 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
} }
if (pMgmt->transId == transId) { if (pMgmt->transId == transId && transId != 0) {
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
} }
pMgmt->transId = 0; pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} else { } else {
#if 1
mError("trans:%d, invalid commit msg since trandId not match with %d", transId, pMgmt->transId);
#else
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) { if (pTrans != NULL) {
mndTransExecute(pMnode, pTrans); mndTransExecute(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} }
#if 0 // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
#endif #endif
} }
} }

View File

@ -642,6 +642,7 @@ void ctgFreeSTableIndex(void *info);
void ctgClearSubTaskRes(SCtgSubRes *pRes); void ctgClearSubTaskRes(SCtgSubRes *pRes);
void ctgFreeQNode(SCtgQNode *node); void ctgFreeQNode(SCtgQNode *node);
void ctgClearHandle(SCatalog* pCtg); void ctgClearHandle(SCatalog* pCtg);
void ctgFreeTbCacheImpl(SCtgTbCache *pCache);
extern SCatalogMgmt gCtgMgmt; extern SCatalogMgmt gCtgMgmt;

View File

@ -647,6 +647,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
CTG_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
bool syncOp = operation->syncOp;
char* opName = gCtgCacheOperation[operation->opId].name;
if (operation->syncOp) { if (operation->syncOp) {
tsem_init(&operation->rspSem, 0, 0); tsem_init(&operation->rspSem, 0, 0);
} }
@ -664,14 +666,14 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
gCtgMgmt.queue.tail = node; gCtgMgmt.queue.tail = node;
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
ctgDebug("action [%s] added into queue", opName);
CTG_QUEUE_INC(); CTG_QUEUE_INC();
CTG_RT_STAT_INC(numOfOpEnqueue, 1); CTG_RT_STAT_INC(numOfOpEnqueue, 1);
tsem_post(&gCtgMgmt.queue.reqSem); tsem_post(&gCtgMgmt.queue.reqSem);
ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name); if (syncOp) {
if (operation->syncOp) {
tsem_wait(&operation->rspSem); tsem_wait(&operation->rspSem);
taosMemoryFree(operation); taosMemoryFree(operation);
} }
@ -840,6 +842,7 @@ _return:
ctgFreeVgInfo(dbInfo); ctgFreeVgInfo(dbInfo);
taosMemoryFreeClear(op->data); taosMemoryFreeClear(op->data);
taosMemoryFreeClear(op);
CTG_RET(code); CTG_RET(code);
} }
@ -852,7 +855,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg)); SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
char *p = strchr(output->dbFName, '.'); char *p = strchr(output->dbFName, '.');
@ -871,6 +874,11 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
_return: _return:
if (output) {
taosMemoryFree(output->tbMeta);
taosMemoryFree(output);
}
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
CTG_RET(code); CTG_RET(code);
@ -1753,6 +1761,16 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
CTG_CACHE_STAT_DEC(numOfStb, 1); CTG_CACHE_STAT_DEC(numOfStb, 1);
} }
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
if (NULL == pTbCache) {
ctgDebug("stb %s already not in cache", msg->stbName);
goto _return;
}
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
ctgFreeTbCacheImpl(pTbCache);
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) { if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
} else { } else {
@ -1780,14 +1798,24 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, msg->dbFName, &dbCache); ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) { if (NULL == dbCache) {
return TSDB_CODE_SUCCESS; goto _return;
} }
if (dbCache->dbId != msg->dbId) { if (dbCache->dbId != msg->dbId) {
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName); ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName);
return TSDB_CODE_SUCCESS; goto _return;
} }
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
if (NULL == pTbCache) {
ctgDebug("tb %s already not in cache", msg->tbName);
goto _return;
}
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
ctgFreeTbCacheImpl(pTbCache);
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) { if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName); ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
@ -2063,6 +2091,8 @@ void* ctgUpdateThreadFunc(void* param) {
if (operation->syncOp) { if (operation->syncOp) {
tsem_post(&operation->rspSem); tsem_post(&operation->rspSem);
} else {
taosMemoryFreeClear(operation);
} }
CTG_RT_STAT_INC(numOfOpDequeue, 1); CTG_RT_STAT_INC(numOfOpDequeue, 1);

View File

@ -261,6 +261,8 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
_return: _return:
taosMemoryFree(pMsg->pData);
if (pJob) { if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
} }

View File

@ -152,6 +152,7 @@ void ctgFreeStbMetaCache(SCtgDBCache *dbCache) {
} }
void ctgFreeTbCacheImpl(SCtgTbCache *pCache) { void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
qDebug("tbMeta freed, p:%p", pCache->pMeta);
taosMemoryFreeClear(pCache->pMeta); taosMemoryFreeClear(pCache->pMeta);
if (pCache->pIndex) { if (pCache->pIndex) {
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo); taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
@ -831,6 +832,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
if (output->tbMeta) { if (output->tbMeta) {
int32_t metaSize = CTG_META_SIZE(output->tbMeta); int32_t metaSize = CTG_META_SIZE(output->tbMeta);
(*pOutput)->tbMeta = taosMemoryMalloc(metaSize); (*pOutput)->tbMeta = taosMemoryMalloc(metaSize);
qDebug("tbMeta cloned, size:%d, p:%p", metaSize, (*pOutput)->tbMeta);
if (NULL == (*pOutput)->tbMeta) { if (NULL == (*pOutput)->tbMeta) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
taosMemoryFreeClear(*pOutput); taosMemoryFreeClear(*pOutput);

View File

@ -1976,7 +1976,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "leastsquares", .name = "leastsquares",
.type = FUNCTION_TYPE_LEASTSQUARES, .type = FUNCTION_TYPE_LEASTSQUARES,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateLeastSQR, .translateFunc = translateLeastSQR,
.getEnvFunc = getLeastSQRFuncEnv, .getEnvFunc = getLeastSQRFuncEnv,
.initFunc = leastSQRFunctionSetup, .initFunc = leastSQRFunctionSetup,

View File

@ -218,7 +218,7 @@ static SNode* createConstantValue() {
static int32_t calcConstProjections(SCalcConstContext* pCxt, SSelectStmt* pSelect, bool subquery) { static int32_t calcConstProjections(SCalcConstContext* pCxt, SSelectStmt* pSelect, bool subquery) {
SNode* pProj = NULL; SNode* pProj = NULL;
WHERE_EACH(pProj, pSelect->pProjectionList) { WHERE_EACH(pProj, pSelect->pProjectionList) {
if (subquery && isUselessCol((SExprNode*)pProj)) { if (subquery && !pSelect->isDistinct && isUselessCol((SExprNode*)pProj)) {
ERASE_NODE(pSelect->pProjectionList); ERASE_NODE(pSelect->pProjectionList);
continue; continue;
} }

View File

@ -4757,8 +4757,13 @@ static int32_t extractQueryResultSchema(const SNodeList* pProjections, int32_t*
int32_t index = 0; int32_t index = 0;
FOREACH(pNode, pProjections) { FOREACH(pNode, pProjections) {
SExprNode* pExpr = (SExprNode*)pNode; SExprNode* pExpr = (SExprNode*)pNode;
(*pSchema)[index].type = pExpr->resType.type; if (TSDB_DATA_TYPE_NULL == pExpr->resType.type) {
(*pSchema)[index].bytes = pExpr->resType.bytes; (*pSchema)[index].type = TSDB_DATA_TYPE_VARCHAR;
(*pSchema)[index].bytes = 0;
} else {
(*pSchema)[index].type = pExpr->resType.type;
(*pSchema)[index].bytes = pExpr->resType.bytes;
}
(*pSchema)[index].colId = index + 1; (*pSchema)[index].colId = index + 1;
if ('\0' != pExpr->userAlias[0]) { if ('\0' != pExpr->userAlias[0]) {
strcpy((*pSchema)[index].name, pExpr->userAlias); strcpy((*pSchema)[index].name, pExpr->userAlias);

View File

@ -282,7 +282,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
} }
*str = '"'; *str = '"';
int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1); int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1);
if (length <= 0) { if (length <= 0) {
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
@ -310,15 +310,15 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
if(len) *len = n; if (len) *len = n;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
char* parseTagDatatoJson(void* p) { char* parseTagDatatoJson(void* p) {
char* string = NULL; char* string = NULL;
SArray* pTagVals = NULL; SArray* pTagVals = NULL;
cJSON* json = NULL; cJSON* json = NULL;
if (tTagToValArray((const STag*)p, &pTagVals) != 0) { if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
goto end; goto end;
} }
@ -327,7 +327,7 @@ char* parseTagDatatoJson(void* p) {
if (nCols == 0) { if (nCols == 0) {
goto end; goto end;
} }
char tagJsonKey[256] = {0}; char tagJsonKey[256] = {0};
json = cJSON_CreateObject(); json = cJSON_CreateObject();
if (json == NULL) { if (json == NULL) {
goto end; goto end;
@ -390,7 +390,7 @@ char* parseTagDatatoJson(void* p) {
end: end:
cJSON_Delete(json); cJSON_Delete(json);
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
if(string == NULL){ if (string == NULL) {
string = strdup(TSDB_DATA_NULL_STR_L); string = strdup(TSDB_DATA_NULL_STR_L);
} }
return string; return string;

View File

@ -41,6 +41,8 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
if (pTask->execNodes) { if (pTask->execNodes) {
taosHashCleanup(pTask->execNodes); taosHashCleanup(pTask->execNodes);
} }
taosMemoryFree(pTask->profile.execTime);
} }
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) { int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) {

View File

@ -170,6 +170,7 @@
# --- valgrind # --- valgrind
./test.sh -f tsim/valgrind/checkError1.sim ./test.sh -f tsim/valgrind/checkError1.sim
./test.sh -f tsim/valgrind/checkError2.sim ./test.sh -f tsim/valgrind/checkError2.sim
./test.sh -f tsim/valgrind/checkError3.sim
# --- vnode # --- vnode
# ./test.sh -f tsim/vnode/replica3_basic.sim # ./test.sh -f tsim/vnode/replica3_basic.sim

View File

@ -35,36 +35,51 @@ if $rows != 1 then
return -1 return -1
endi endi
print =============== step3: create show table print =============== step4: create show table
sql create table ct1 using stb tags(1000) sql create table ct1 using stb tags(1000)
sql create table ct2 using stb tags(2000)
sql create table ct3 using stb tags(3000)
sql show tables sql show tables
if $rows != 1 then if $rows != 3 then
return -1 return -1
endi endi
print =============== step5: insert data print =============== step5: insert data
sql insert into ct1 values(now+0s, 10, 2.0, 3.0) sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
sql insert into ct2 values(now+0s, 10, 2.0, 3.0)
sql insert into ct2 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0)
print =============== step6: select data print =============== step6: query data
sql select * from ct1 sql select * from ct1
#sql select * from stb sql select * from stb
sql select c1, c2, c3 from ct1
sql select ts, c1, c2, c3 from stb
print =============== step7: count
sql select count(*) from ct1;
#sql select count(*) from stb;
#sql select count(ts), count(c1), count(c2), count(c3) from ct1
#sql select count(ts), count(c1), count(c2), count(c3) from stb
print =============== step8: func
#sql select first(ts), first(c1), first(c2), first(c3) from ct1
#sql select min(c1), min(c2), min(c3) from ct1
#sql select max(c1), max(c2), max(c3) from ct1
#sql select sum(c1), sum(c2), sum(c3) from ct1
_OVER: _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check print =============== check
print ----> start to check if there are ERRORS in vagrind log file for each dnode
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <= 0 then
return 0
endi
$null= $null=
if $system_content == $null then
return 0 system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 1 then
return -1
endi endi
return -1 if $system_content == $null then
return -1
endi

View File

@ -23,48 +23,74 @@ if $data(1)[4] != ready then
endi endi
print =============== step2: create db print =============== step2: create db
sql create database d1 vgroups 1 buffer 3 sql create database d1 vgroups 3 buffer 3
sql show databases sql show databases
sql use d1 sql use d1
sql show vgroups sql show vgroups
print =============== step3: create show stable print =============== step3: create show stable, include all type
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) sql create table if not exists stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(16), c9 nchar(16), c10 timestamp, c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned) tags (t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 binary(16), t9 nchar(16), t10 timestamp, t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)
sql create stable if not exists stb_1 (ts timestamp, c1 int) tags (j int)
sql create table stb_2 (ts timestamp, c1 int) tags (t1 int)
sql create stable stb_3 (ts timestamp, c1 int) tags (t1 int)
sql show stables sql show stables
if $rows != 1 then if $rows != 4 then
return -1 return -1
endi endi
print =============== step3: create show table print =============== step4: ccreate child table
sql create table ct1 using stb tags(1000) sql create table c1 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
#sql show tables sql create table c2 using stb tags(false, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 2', 'child tbl 2', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
#if $rows != 1 then sql show tables
# return -1 if $rows != 2 then
#endi return -1
endi
print =============== step5: insert data print =============== step5: insert data
sql insert into ct1 values(now+0s, 10, 2.0, 3.0) sql insert into c1 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql insert into c2 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql insert into c2 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
print =============== step6: select data print =============== step6: alter insert
sql select * from ct1 sql insert into c3 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql insert into c3 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
goto _OVER
print =============== stepa: query data
sql select * from c1
sql select * from stb sql select * from stb
sql select * from stb_1
sql select ts, c1, c2, c3 from c1
sql select ts, c1, c2, c3 from stb
sql select ts, c1 from stb_2
sql select ts, c1, t1 from c1
sql select ts, c1, t1 from stb
sql select ts, c1, t1 from stb_2
print =============== stepb: count
#sql select count(*) from c1;
#sql select count(*) from stb;
#sql select count(ts), count(c1), count(c2), count(c3) from c1
#sql select count(ts), count(c1), count(c2), count(c3) from stb
print =============== stepc: func
#sql select first(ts), first(c1), first(c2), first(c3) from c1
#sql select min(c1), min(c2), min(c3) from c1
#sql select max(c1), max(c2), max(c3) from c1
#sql select sum(c1), sum(c2), sum(c3) from c1
_OVER: _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check print =============== check
print ----> start to check if there are ERRORS in vagrind log file for each dnode
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <= 0 then
return 0
endi
$null= $null=
if $system_content == $null then
return 0 system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 1 then
return -1
endi endi
return -1 if $system_content == $null then
return -1
endi

View File

@ -36,7 +36,12 @@ sql create dnode $hostname port 7200
sql drop dnode 2 sql drop dnode 2
sql alter dnode 1 'debugflag 131' sql alter dnode 1 'debugflag 131'
print =============== step4: print =============== create database, stable, table
sql create database db vgroups 3
sql use db
sql create table stb (ts timestamp, c int) tags (t int)
sql create table t0 using stb tags (0)
sql create table tba (ts timestamp, c1 binary(10), c2 nchar(10));
print =============== run show xxxx print =============== run show xxxx
sql show dnodes sql show dnodes
@ -50,6 +55,16 @@ if $rows != 1 then
endi endi
sql show databases sql show databases
if $rows != 3 then
return -1
endi
sql show stables
if $rows != 1 then
return -1
endi
sql show tables
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
@ -70,11 +85,31 @@ if $rows != 1 then
return -1 return -1
endi endi
sql select * from information_schema.user_databases
if $rows != 3 then
return -1
endi
sql select * from information_schema.user_stables
if $rows != 1 then
return -1
endi
sql select * from information_schema.user_tables
if $rows != 30 then
return -1
endi
sql select * from information_schema.user_users sql select * from information_schema.user_users
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
sql select * from information_schema.`vgroups`
if $rows != 3 then
return -1
endi
sql show variables; sql show variables;
if $rows != 4 then if $rows != 4 then
return -1 return -1
@ -94,17 +129,14 @@ print =============== stop
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check print =============== check
print ----> start to check if there are ERRORS in vagrind log file for each dnode
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <= 0 then
return 0
endi
$null= $null=
if $system_content == $null then
return 0 system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 1 then
return -1
endi endi
return -1 if $system_content == $null then
return -1
endi

View File

@ -35,7 +35,7 @@ if $rows != 1 then
return -1 return -1
endi endi
print =============== step3: create show table print =============== step4: create show table
sql create table ct1 using stb tags(1000) sql create table ct1 using stb tags(1000)
sql show tables sql show tables
if $rows != 1 then if $rows != 1 then
@ -54,17 +54,14 @@ _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check print =============== check
print ----> start to check if there are ERRORS in vagrind log file for each dnode
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <= 0 then
return 0
endi
$null= $null=
if $system_content == $null then
return 0 system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 1 then
return -1
endi endi
return -1 if $system_content == $null then
return -1
endi

View File

@ -4,7 +4,7 @@ system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v system sh/exec.sh -n dnode1 -s start -v
sql connect sql connect
print =============== step1: show dnodes print =============== step1: create drop show dnodes
$x = 0 $x = 0
step1: step1:
$x = $x + 1 $x = $x + 1
@ -22,124 +22,74 @@ if $data(1)[4] != ready then
goto step1 goto step1
endi endi
print =============== step2: create alter drop show user print =============== step2: create db
sql create user u1 pass 'taosdata' sql create database d1 vgroups 3 buffer 3
sql show users
sql alter user u1 sysinfo 1
sql alter user u1 enable 1
sql alter user u1 pass 'taosdata'
sql drop user u1
sql_error alter user u2 sysinfo 0
print =============== step3: create drop dnode
sql create dnode $hostname port 7200
sql drop dnode 2
sql alter dnode 1 'debugflag 131'
print =============== create database, stable, table
sql create database db vgroups 3
sql use db
sql create table stb (ts timestamp, c int) tags (t int)
sql create table t0 using stb tags (0)
sql create table tba (ts timestamp, c1 binary(10), c2 nchar(10));
print =============== run show xxxx
sql show dnodes
if $rows != 1 then
return -1
endi
sql show mnodes
if $rows != 1 then
return -1
endi
sql show databases sql show databases
if $rows != 3 then sql use d1
return -1 sql show vgroups
endi
print =============== step3: create show stable, include all type
sql create table if not exists stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(16), c9 nchar(16), c10 timestamp, c11 tinyint unsigned, c12 smallint unsigned, c13 int unsigned, c14 bigint unsigned) tags (t1 bool, t2 tinyint, t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 binary(16), t9 nchar(16), t10 timestamp, t11 tinyint unsigned, t12 smallint unsigned, t13 int unsigned, t14 bigint unsigned)
sql create stable if not exists stb_1 (ts timestamp, c1 int) tags (j int)
sql create table stb_2 (ts timestamp, c1 int) tags (t1 int)
sql create stable stb_3 (ts timestamp, c1 int) tags (t1 int)
sql show stables sql show stables
if $rows != 1 then if $rows != 4 then
return -1 return -1
endi endi
print =============== step4: ccreate child table
sql create table c1 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql create table c2 using stb tags(false, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 2', 'child tbl 2', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql show tables sql show tables
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
sql show users print =============== step5: insert data
if $rows != 1 then sql insert into c1 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
return -1 sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
endi sql insert into c2 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql insert into c2 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
print =============== run select * from information_schema.xxxx print =============== step6: alter insert
sql select * from information_schema.`dnodes` sql insert into c3 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
if $rows != 1 then sql insert into c3 using stb tags(true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
return -1
endi
sql select * from information_schema.`mnodes` print =============== stepa: query data
if $rows != 1 then sql select * from c1
return -1 sql select * from stb
endi sql select * from stb_1
sql select ts, c1, c2, c3 from c1
sql select ts, c1, c2, c3 from stb
sql select ts, c1 from stb_2
sql select ts, c1, t1 from c1
sql select ts, c1, t1 from stb
sql select ts, c1, t1 from stb_2
sql select * from information_schema.user_databases print =============== stepb: count
if $rows != 3 then #sql select count(*) from c1;
return -1 #sql select count(*) from stb;
endi #sql select count(ts), count(c1), count(c2), count(c3) from c1
#sql select count(ts), count(c1), count(c2), count(c3) from stb
sql select * from information_schema.user_stables print =============== stepc: func
if $rows != 1 then #sql select first(ts), first(c1), first(c2), first(c3) from c1
return -1 #sql select min(c1), min(c2), min(c3) from c1
endi #sql select max(c1), max(c2), max(c3) from c1
#sql select sum(c1), sum(c2), sum(c3) from c1
sql select * from information_schema.user_tables _OVER:
if $rows != 30 then
return -1
endi
sql select * from information_schema.user_users
if $rows != 1 then
return -1
endi
sql select * from information_schema.`vgroups`
if $rows != 3 then
return -1
endi
sql show variables;
if $rows != 4 then
return -1
endi
sql show dnode 1 variables;
if $rows <= 0 then
return -1
endi
sql show local variables;
if $rows <= 0 then
return -1
endi
print =============== stop
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check print =============== check
print ----> start to check if there are ERRORS in vagrind log file for each dnode
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <= 0 then
return 0
endi
$null= $null=
if $system_content == $null then
return 0 system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 1 then
return -1
endi endi
return -1 if $system_content == $null then
return -1
endi

View File

@ -371,7 +371,7 @@ class TMQCom:
elif (i % 3 == 0): elif (i % 3 == 0):
tagBinaryValue = 'changsha' tagBinaryValue = 'changsha'
sql += " %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) sql += " %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
for j in range(rowsPerTbl): for j in range(rowsPerTbl):
sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched) sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched)
rowsBatched += 1 rowsBatched += 1
@ -379,7 +379,7 @@ class TMQCom:
tsql.execute(sql) tsql.execute(sql)
rowsBatched = 0 rowsBatched = 0
if j < rowsPerTbl - 1: if j < rowsPerTbl - 1:
sql = "insert into %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue) sql = "insert into %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
else: else:
sql = "insert into " sql = "insert into "
#end sql #end sql

View File

@ -0,0 +1,259 @@
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 4
self.ctbNum = 1
self.rowsPerTbl = 100000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 1200,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
# 自动建表完成数据插入,启动消费
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 3000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 3/2)
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 5000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 1
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicFromStb1
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
# tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
# self.prepareTestEnv()
# tdLog.printNoPrefix("=============================================")
# tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
# self.tmqCase1()
# self.tmqCase2()
self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.tmqCase1()
# self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -16,9 +16,11 @@ from tmqCommon import *
class TDTestCase: class TDTestCase:
def __init__(self): def __init__(self):
self.snapshot = 0
self.vgroups = 4 self.vgroups = 4
self.ctbNum = 500 self.ctbNum = 100
self.rowsPerTbl = 1000 self.rowsPerTbl = 1000
self.autoCtbPrefix = 'aCtb'
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
@ -39,7 +41,7 @@ class TDTestCase:
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 1000, 'ctbNum': 1000,
'rowsPerTbl': 1000, 'rowsPerTbl': 1000,
'batchNum': 400, 'batchNum': 10000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3, 'pollDelay': 3,
'showMsg': 1, 'showMsg': 1,
@ -61,7 +63,7 @@ class TDTestCase:
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
@ -85,20 +87,21 @@ class TDTestCase:
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 1000, 'ctbNum': 1000,
'rowsPerTbl': 1000, 'rowsPerTbl': 1000,
'batchNum': 400, 'batchNum': 10000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 5,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables # update to half tables
paraDict['ctbNum'] = int(self.ctbNum / 2) paraDict['ctbNum'] = int(self.ctbNum/2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
@ -113,11 +116,16 @@ class TDTestCase:
tdSql.execute(sqlString) tdSql.execute(sqlString)
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
consumerId = 0 consumerId = 0
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 3) if self.snapshot == 0:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2))
elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 0 ifcheckdata = 1
ifManualCommit = 0 ifManualCommit = 1
keyList = 'group.id:cgrp1,\ keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\ enable.auto.commit:true,\
auto.commit.interval.ms:1000,\ auto.commit.interval.ms:1000,\
@ -127,7 +135,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result") tdLog.info("start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0 totalConsumeRows = 0
@ -141,8 +149,9 @@ class TDTestCase:
if totalConsumeRows != expectrowcnt: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self): def tmqCase2(self):
@ -160,13 +169,14 @@ class TDTestCase:
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 1000, 'ctbNum': 1000,
'rowsPerTbl': 1000, 'rowsPerTbl': 1000,
'batchNum': 1000, 'batchNum': 10000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 5,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
@ -175,13 +185,20 @@ class TDTestCase:
tdSql.query("flush database %s"%(paraDict['dbName'])) tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables # update to half tables
paraDict['ctbNum'] = int(self.ctbNum / 2) paraDict['ctbNum'] = int(self.ctbNum/2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=self.autoCtbPrefix,
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="aCtby",
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum']) startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+int(self.ctbNum/2))
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
@ -191,12 +208,18 @@ class TDTestCase:
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
# paraDict['ctbNum'] = self.ctbNum
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
consumerId = 0 paraDict['rowsPerTbl'] = self.rowsPerTbl
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 * 2 consumerId = 1
if self.snapshot == 0:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2*2 + 1/2*1/2))
elif self.snapshot == 1:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2 + 1/2*1/2))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 0 ifcheckdata = 1
ifManualCommit = 0 ifManualCommit = 1
keyList = 'group.id:cgrp1,\ keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\ enable.auto.commit:true,\
auto.commit.interval.ms:1000,\ auto.commit.interval.ms:1000,\
@ -206,7 +229,7 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result") tdLog.info("start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0 totalConsumeRows = 0
@ -218,10 +241,10 @@ class TDTestCase:
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != totalRowsInserted: if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
@ -230,6 +253,15 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
self.prepareTestEnv() self.prepareTestEnv()
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1()
self.tmqCase2()
self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()

View File

@ -178,6 +178,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py #python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
#python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py
python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb.py
#------------querPolicy 2----------- #------------querPolicy 2-----------