Merge branch '3.0' into merge/3.0tomain

This commit is contained in:
Shengliang Guan 2024-12-25 17:41:45 +08:00
commit 018148d9ee
32 changed files with 1659 additions and 2572 deletions

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG main
GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG main
GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taosws-rs
ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
GIT_TAG main
GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -160,7 +160,7 @@ int32_t metaAlterSuperTable(SMeta* pMeta, int64_t version, SVCreateStbRe
int32_t metaDropSuperTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int32_t metaCreateTable2(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** ppRsp);
int32_t metaDropTable2(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
int32_t metaTrimTables(SMeta* pMeta);
int32_t metaTrimTables(SMeta* pMeta, int64_t version);
int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tbUids);
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);

View File

@ -1645,7 +1645,6 @@ static int32_t metaHandleSuperTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry
metaFetchEntryFree(&pOldEntry);
return code;
}
// TAOS_CHECK_RETURN(metaGetSubtables(pMeta, pEntry->uid, uids));
TAOS_CHECK_RETURN(tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type));
} else if (deltaCol == -1) {
int16_t cid = -1;
@ -1667,7 +1666,6 @@ static int32_t metaHandleSuperTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry
metaFetchEntryFree(&pOldEntry);
return code;
}
// TAOS_CHECK_RETURN(metaGetSubtables(pMeta, pEntry->uid, uids));
TAOS_CHECK_RETURN(tsdbCacheDropSTableColumn(pTsdb, uids, cid, hasPrimaryKey));
}
}

File diff suppressed because it is too large Load Diff

View File

@ -843,7 +843,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->workerCb};
initStorageAPI(&handle.api);
int32_t code = TSDB_CODE_SUCCESS;
bool redirected = false;
bool redirected = false;
switch (pMsg->msgType) {
case TDMT_SCH_QUERY:
@ -2145,7 +2145,7 @@ static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
code = metaTrimTables(pVnode->pMeta);
code = metaTrimTables(pVnode->pMeta, ver);
return code;
}

View File

@ -424,6 +424,7 @@ int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName,
CTG_RET(TSDB_CODE_SUCCESS);
}
#if 0
int32_t ctgGetTbTag(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** pRes) {
SVgroupInfo vgroupInfo = {0};
STableCfg* pCfg = NULL;
@ -474,6 +475,7 @@ _return:
CTG_RET(code);
}
#endif
int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** pVgList) {
STableMeta* tbMeta = NULL;
@ -1695,6 +1697,7 @@ _return:
CTG_API_LEAVE(code);
}
#if 0
int32_t catalogGetTableTag(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes) {
CTG_API_ENTER();
@ -1709,6 +1712,7 @@ _return:
CTG_API_LEAVE(code);
}
#endif
int32_t catalogRefreshGetTableCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** pCfg) {
CTG_API_ENTER();
@ -1845,7 +1849,7 @@ _return:
CTG_API_LEAVE(code);
}
#if 0
int32_t catalogAsyncUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg) {
CTG_API_ENTER();
@ -1860,6 +1864,7 @@ _return:
CTG_API_LEAVE(code);
}
#endif
int32_t catalogGetViewMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pViewName, STableMeta** pTableMeta) {
CTG_API_ENTER();

View File

@ -2992,6 +2992,7 @@ TEST(apiTest, catalogGetTableIndex_test) {
catalogDestroy();
}
TEST(apiTest, catalogGetDBCfg_test) {
struct SCatalog *pCtg = NULL;
SRequestConnInfo connInfo = {0};

View File

@ -41,6 +41,7 @@
#include "tvariant.h"
#include "stub.h"
#include "querytask.h"
#include "hashjoin.h"
namespace {
@ -3766,6 +3767,21 @@ TEST(leftWinJoin, noCondProjectionTest) {
#endif
#if 1
TEST(functionsTest, branch) {
struct SOperatorInfo op = {0};
SHJoinOperatorInfo join;
SBufRowInfo bufrow = {0};
SSDataBlock blk = {0};
op.info = &join;
memset(&join, 0, sizeof(join));
join.ctx.pBuildRow = &bufrow;
blk.info.rows = 1;
join.finBlk = &blk;
hInnerJoinDo(&op);
}
#endif
int main(int argc, char** argv) {

View File

@ -161,6 +161,10 @@ int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { return nodesStringToNode(pStr, (SNode**)pSubplan); }
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
if (NULL == pSubplan) {
return terrno = TSDB_CODE_INVALID_PARA;
}
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
*pLen = insert->size;

View File

@ -1,6 +1,7 @@
#include "qwInt.h"
#include "qworker.h"
#if 0
void qwSetConcurrentTaskNumCb(int32_t taskNum) {
int32_t finTaskNum = TMIN(taskNum, tsNumOfQueryThreads * QW_DEFAULT_THREAD_TASK_NUM);
@ -33,6 +34,7 @@ void qwIncConcurrentTaskNumCb(void) {
//TODO
}
#endif
int32_t qwInitJobInfo(QW_FPARAMS_DEF, SQWJobInfo* pJob) {
pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);

View File

@ -149,11 +149,11 @@ typedef struct SSchedulerMgmt {
bool exit;
int32_t jobRef;
int32_t jobNum;
SSchStat stat;
void *timer;
SRWLatch hbLock;
SHashObj *hbConnections;
void *queryMgmt;
SSchStat stat;
} SSchedulerMgmt;
typedef struct SSchCallbackParamHeader {

View File

@ -50,11 +50,6 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
for (int32_t i = 0; i < taskNum; ++i) {
SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
if (NULL == pTask) {
SCH_JOB_DLOG("fail to get the %dth task", i);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
sum += pTask->plan->execNodeStat.tableNum;
}

View File

@ -273,18 +273,18 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
if (taskDone < pTask->level->taskNum) {
SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
return TSDB_CODE_SUCCESS;
} else if (taskDone > pTask->level->taskNum) {
SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
}
SCH_TASK_DLOG("taskDone number reach level task number, done:%d, total:%d", taskDone, pTask->level->taskNum);
if (pTask->level->taskFailed > 0) {
SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
} else {
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
}
} else {
pJob->resNode = pTask->succeedAddr;
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
}
pJob->resNode = pTask->succeedAddr;
pJob->fetchTask = pTask;
@ -600,12 +600,12 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
SCH_TASK_DLOG("task already in execTask list, code:%x", code);
SCH_TASK_DLOG("task already in execTask list, code:0x%x", code);
return TSDB_CODE_SUCCESS;
}
SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:0x%x", errno);
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
SCH_TASK_ELOG("taosHashPut task to execTask list failed, code:0x%x", code);
SCH_ERR_RET(code);
}
SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
@ -800,11 +800,6 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
for (int32_t i = 0; i < nodeNum; ++i) {
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
if (NULL == nload) {
SCH_TASK_ELOG("fail to get the %dth node in nodeList, nodeNum:%d", i, nodeNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SQueryNodeAddr *naddr = &nload->addr;
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {

View File

@ -60,6 +60,19 @@ extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t
extern "C" int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code);
extern "C" int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code);
extern "C" int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask);
extern "C" int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType);
//extern "C" int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
extern "C" int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode);
extern "C" void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel);
extern "C" int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask);
extern "C" int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId);
extern "C" int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask);
extern "C" int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask);
extern "C" int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type);
extern "C" int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask);
extern "C" int32_t schLaunchTaskImpl(void *param);
extern "C" void schHandleTimerEvent(void *param, void *tmrId);
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
@ -1473,6 +1486,263 @@ TEST(otherTest, function) {
schMgmt.jobRef = -1;
}
TEST(otherTest, branch) {
SSchJob job = {0};
SSchTask task = {0};
memset(&schMgmt, 0, sizeof(schMgmt));
schValidateRspMsgType(&job, &task, TDMT_SCH_MERGE_FETCH_RSP);
task.lastMsgType = TDMT_SCH_MERGE_FETCH_RSP - 1;
schValidateRspMsgType(&job, &task, TDMT_SCH_MERGE_FETCH_RSP);
schValidateRspMsgType(&job, &task, 0);
schValidateRspMsgType(&job, &task, TDMT_SCH_QUERY_RSP);
task.lastMsgType = TDMT_SCH_QUERY_RSP - 1;
schValidateRspMsgType(&job, &task, TDMT_SCH_QUERY_RSP);
schProcessFetchRsp(&job, &task, NULL, -1);
schProcessFetchRsp(&job, &task, NULL, 0);
job.fetchRes = (void*)0x1;
schProcessFetchRsp(&job, &task, (char*)taosMemoryMalloc(0), 0);
job.fetchRes = NULL;
SDataBuf databuf = {0};
databuf.msgType = TDMT_VND_ALTER_TABLE_RSP;
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.msgType = TDMT_VND_SUBMIT_RSP;
databuf.pData = taosMemoryMalloc(0);
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.msgType = TDMT_VND_DELETE_RSP;
databuf.pData = taosMemoryMalloc(0);
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.msgType = TDMT_SCH_QUERY_RSP;
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.msgType = TDMT_SCH_QUERY_RSP;
databuf.pData = taosMemoryMalloc(0);
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.msgType = TDMT_SCH_EXPLAIN_RSP;
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.msgType = TDMT_SCH_EXPLAIN_RSP;
databuf.pData = taosMemoryMalloc(0);
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
job.status = 0;
job.attr.explainMode = EXPLAIN_MODE_ANALYZE;
databuf.msgType = TDMT_SCH_EXPLAIN_RSP;
databuf.pData = taosMemoryMalloc(0);
job.status = JOB_TASK_STATUS_FAIL;
job.fetchRes = (void*)0x1;
schProcessResponseMsg(&job, &task, &databuf, 0);
job.fetchRes = NULL;
job.attr.explainMode = EXPLAIN_MODE_ANALYZE;
databuf.msgType = TDMT_SCH_EXPLAIN_RSP;
databuf.pData = taosMemoryMalloc(0);
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.msgType = TDMT_SCH_DROP_TASK_RSP;
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.msgType = TDMT_SCH_LINK_BROKEN;
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.msgType = 0;
job.status = JOB_TASK_STATUS_FAIL;
schProcessResponseMsg(&job, &task, &databuf, 0);
databuf.pData = taosMemoryMalloc(0);
schHandleHbCallback(NULL, &databuf, 0);
__async_send_cb_fn_t fp = NULL;
schGetCallbackFp(TDMT_SCH_TASK_NOTIFY, &fp);
schGetCallbackFp(0, &fp);
SQueryNodeEpId ep = {0};
schBuildAndSendHbMsg(&ep, NULL);
schBuildAndSendMsg(&job, &task, NULL, 0, NULL);
SSchLevel level = {0};
SSubplan subplan;
memset(&subplan, 0, sizeof(subplan));
job.attr.queryJob = true;
schMgmt.cfg.schPolicy = SCH_ALL;
task.plan = &subplan;
schInitTaskRetryTimes(&job, &task, &level);
job.attr.queryJob = false;
memset(&schMgmt.cfg, 0, sizeof(schMgmt.cfg));
memset(&level, 0, sizeof(level));
schRecordTaskSucceedNode(&job, &task);
schDropTaskExecNode(&job, &task, NULL, 0);
task.execNodes = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
schDropTaskExecNode(&job, &task, NULL, 0);
int32_t execId = 0;
task.execId = 1;
(void)taosHashPut(task.execNodes, &execId, sizeof(execId), &execId, sizeof(execId));
schDropTaskExecNode(&job, &task, NULL, execId);
task.execId = 0;
taosHashCleanup(task.execNodes);
task.execNodes = NULL;
job.status = JOB_TASK_STATUS_FAIL;
schProcessOnTaskFailure(&job, &task, 0);
job.status = 0;
task.status = JOB_TASK_STATUS_FAIL;
schProcessOnTaskFailure(&job, &task, 0);
task.status = 0;
task.level = &level;
schProcessOnTaskFailure(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR);
memset(&level, 0, sizeof(level));
task.level = NULL;
subplan.subplanType = SUBPLAN_TYPE_SCAN;
task.plan = &subplan;
SEpSet epset = {0};
epset.numOfEps = 127;
schChkUpdateRedirectCtx(&job, &task, &epset, 0);
schChkUpdateRedirectCtx(&job, &task, NULL, 0);
task.plan = NULL;
schPushTaskToExecList(&job, &task);
job.execTasks = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
taosHashPut(job.execTasks, &task.taskId, sizeof(task.taskId), &task, POINTER_BYTES);
schPushTaskToExecList(&job, &task);
taosHashCleanup(job.execTasks);
job.execTasks = NULL;
bool needRetry = false;
task.timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC / 2 + 1;
task.retryTimes = 0;
task.maxRetryTimes = 0;
schTaskCheckSetRetry(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR, &needRetry);
task.execId = 0;
task.retryTimes = 0;
task.maxRetryTimes = 100;
task.maxExecTimes = 1;
schTaskCheckSetRetry(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR, &needRetry);
task.execId = 0;
task.retryTimes = 0;
task.maxRetryTimes = 100;
task.maxExecTimes = 100;
task.lastMsgType = TDMT_SCH_LINK_BROKEN;
schTaskCheckSetRetry(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR, &needRetry);
schSetAddrsFromNodeList(&job, &task);
schSwitchTaskCandidateAddr(&job, &task);
task.candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
SQueryNodeAddr addr = {0};
taosArrayPush(task.candidateAddrs, &addr);
taosArrayPush(task.candidateAddrs, &addr);
schMgmt.cfg.schPolicy = SCH_LOAD_SEQ;
task.candidateIdx = 1;
schSwitchTaskCandidateAddr(&job, &task);
schMgmt.cfg.schPolicy = SCH_RANDOM;
schSwitchTaskCandidateAddr(&job, &task);
taosArrayDestroy(task.candidateAddrs);
task.candidateAddrs = NULL;
memset(&schMgmt.cfg, 0, sizeof(schMgmt.cfg));
task.candidateIdx = 0;
schDropTaskOnExecNode(&job, &task);
schNotifyTaskOnExecNode(&job, &task, TASK_NOTIFY_FINISHED);
schLaunchRemoteTask(&job, &task);
SSchTaskCtx* pCtx = (SSchTaskCtx*)taosMemoryCalloc(1, sizeof(SSchTaskCtx));
pCtx->jobRid = -1;
schLaunchTaskImpl((void*)pCtx);
task.plan = &subplan;
subplan.subplanType = SUBPLAN_TYPE_SCAN;
job.attr.needFlowCtrl = true;
level.taskNum = 1000;
task.level = &level;
schLaunchTask(&job, &task);
task.plan = NULL;
task.level = NULL;
job.attr.needFlowCtrl = false;
SSchTimerParam param = {0};
param.rId = -1;
schHandleTimerEvent(&param, NULL);
job.execTasks = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
task.delayExecMs = 1;
schMgmt.timer = NULL;
schDelayLaunchTask(&job, &task);
task.delayExecMs = 0;
taosHashCleanup(job.execTasks);
job.execTasks = NULL;
job.fetchRes = (void*)0x1;
schLaunchFetchTask(&job);
job.fetchRes = NULL;
job.fetchTask = &task;
job.attr.localExec = true;
job.attr.queryJob = true;
subplan.subplanType = SUBPLAN_TYPE_MERGE;
task.plan = &subplan;
void* p = taosMemoryCalloc(1, 1024);
schMgmt.queryMgmt = p;
schLaunchFetchTask(&job);
memset(&job, 0, sizeof(job));
memset(&subplan, 0, sizeof(subplan));
task.plan = NULL;
taosMemoryFreeClear(schMgmt.queryMgmt);
// flow ctrl
job.flowCtrl = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
SEp sep = {0};
SSchFlowControl nctrl = {0};
nctrl.taskList = taosArrayInit(1, POINTER_BYTES);
taosHashPut(job.flowCtrl, &sep, sizeof(SEp), &nctrl, sizeof(nctrl));
schFreeFlowCtrl(&job);
schMgmt.jobRef = -1;
}
void schtReset() {
insertJobRefId = 0;
queryJobRefId = 0;

View File

@ -157,7 +157,7 @@ int32_t walLoadMeta(SWal* pWal);
int32_t walSaveMeta(SWal* pWal);
int32_t walRemoveMeta(SWal* pWal);
int32_t walRollFileInfo(SWal* pWal);
int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer);
int32_t walCheckAndRepairMeta(SWal* pWal);
int32_t walCheckAndRepairIdx(SWal* pWal);

View File

@ -46,7 +46,7 @@ static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
return snprintf(buf, WAL_FILE_LEN, "%s/meta-ver.tmp", pWal->path);
}
static FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer) {
FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, int64_t* lastVer) {
int32_t code = 0, lino = 0;
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
int64_t retVer = -1;

View File

@ -127,7 +127,7 @@ class WalRetentionEnv : public ::testing::Test {
SWalCfg cfg;
cfg.rollPeriod = -1;
cfg.segSize = -1;
cfg.committed =-1;
cfg.committed = -1;
cfg.retentionPeriod = -1;
cfg.retentionSize = 0;
cfg.rollPeriod = 0;
@ -146,6 +146,83 @@ class WalRetentionEnv : public ::testing::Test {
const char* pathName = TD_TMP_DIR_PATH "wal_test";
};
class WalSkipLevel : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit(NULL);
ASSERT(code == 0);
}
static void TearDownTestCase() { walCleanUp(); }
void walResetEnv() {
TearDown();
taosRemoveDir(pathName);
SetUp();
}
void SetUp() override {
SWalCfg cfg;
cfg.rollPeriod = -1;
cfg.segSize = -1;
cfg.committed = -1;
cfg.retentionPeriod = -1;
cfg.retentionSize = 0;
cfg.rollPeriod = 0;
cfg.vgId = 1;
cfg.level = TAOS_WAL_SKIP;
pWal = walOpen(pathName, &cfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = TD_TMP_DIR_PATH "wal_test";
};
class WalEncrypted : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit(NULL);
ASSERT(code == 0);
}
static void TearDownTestCase() { walCleanUp(); }
void walResetEnv() {
TearDown();
taosRemoveDir(pathName);
SetUp();
}
void SetUp() override {
SWalCfg cfg;
cfg.rollPeriod = -1;
cfg.segSize = -1;
cfg.committed = -1;
cfg.retentionPeriod = -1;
cfg.retentionSize = 0;
cfg.rollPeriod = 0;
cfg.vgId = 0;
cfg.level = TAOS_WAL_FSYNC;
cfg.encryptAlgorithm = 1;
pWal = walOpen(pathName, &cfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = TD_TMP_DIR_PATH "wal_test";
};
TEST_F(WalCleanEnv, createNew) {
walRollFileInfo(pWal);
ASSERT(pWal->fileInfoSet != NULL);
@ -373,6 +450,183 @@ TEST_F(WalKeepEnv, readHandleRead) {
walCloseReader(pRead);
}
TEST_F(WalKeepEnv, walLogExist) {
walResetEnv();
int code;
SWalReader* pRead = walOpenReader(pWal, NULL, 0);
ASSERT(pRead != NULL);
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
walLogExist(pWal, 0);
ASSERT_EQ(code, 0);
walCloseReader(pRead);
}
TEST_F(WalKeepEnv, walScanLogGetLastVerHeadMissMatch) {
walResetEnv();
int code;
do {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, 0);
sprintf(newStr, "%s-%d", ranStr, 0);
int len = strlen(newStr);
code = walAppendLog(pWal, 0, 0, syncMeta, newStr, len);
} while (0);
int i = 0;
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
int64_t offset = walGetCurFileOffset(pWal);
SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
pWal->writeHead.head.version = i;
pWal->writeHead.head.bodyLen = len;
pWal->writeHead.head.msgType = 0;
pWal->writeHead.head.ingestTs = taosGetTimestampUs();
pWal->writeHead.head.syncMeta = syncMeta;
pWal->writeHead.cksumHead = 1;
pWal->writeHead.cksumBody = walCalcBodyCksum(newStr, len);
taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead));
taosWriteFile(pWal->pLogFile, newStr, len);
int64_t lastVer = 0;
code = walScanLogGetLastVer(pWal, 0, &lastVer);
ASSERT_EQ(code, TSDB_CODE_WAL_CHKSUM_MISMATCH);
}
TEST_F(WalKeepEnv, walScanLogGetLastVerBodyMissMatch) {
walResetEnv();
int code;
do {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, 0);
sprintf(newStr, "%s-%d", ranStr, 0);
int len = strlen(newStr);
code = walAppendLog(pWal, 0, 0, syncMeta, newStr, len);
} while (0);
int i = 0;
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
int64_t offset = walGetCurFileOffset(pWal);
SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
pWal->writeHead.head.version = i;
pWal->writeHead.head.bodyLen = len;
pWal->writeHead.head.msgType = 0;
pWal->writeHead.head.ingestTs = taosGetTimestampUs();
pWal->writeHead.head.syncMeta = syncMeta;
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = 1;
taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead));
taosWriteFile(pWal->pLogFile, newStr, len);
int64_t lastVer = 0;
code = walScanLogGetLastVer(pWal, 0, &lastVer);
ASSERT_EQ(code, TSDB_CODE_WAL_CHKSUM_MISMATCH);
}
TEST_F(WalKeepEnv, walCheckAndRepairIdxFile) {
walResetEnv();
int code;
do {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, 0);
sprintf(newStr, "%s-%d", ranStr, 0);
int len = strlen(newStr);
code = walAppendLog(pWal, 0, 0, syncMeta, newStr, len);
} while (0);
SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
for (int i = 1; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
pWal->writeHead.head.version = i;
pWal->writeHead.head.bodyLen = len;
pWal->writeHead.head.msgType = 0;
pWal->writeHead.head.ingestTs = taosGetTimestampUs();
pWal->writeHead.head.syncMeta = syncMeta;
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(newStr, len);
taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead));
taosWriteFile(pWal->pLogFile, newStr, len);
}
pWal->vers.lastVer = 99;
pFileInfo->lastVer = 99;
code = walCheckAndRepairIdx(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalKeepEnv, walRestoreFromSnapshot1) {
walResetEnv();
int code;
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
code = walRestoreFromSnapshot(pWal, 50);
ASSERT_EQ(code, 0);
}
TEST_F(WalKeepEnv, walRestoreFromSnapshot2) {
walResetEnv();
int code;
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
SWalRef* ref = walOpenRef(pWal);
ref->refVer = 10;
code = walRestoreFromSnapshot(pWal, 99);
ASSERT_EQ(code, -1);
}
TEST_F(WalKeepEnv, walRollback) {
walResetEnv();
int code;
int i;
for (i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walAppendLog(pWal, i, 0, syncMeta, newStr, len);
ASSERT_EQ(code, 0);
}
code = walRollback(pWal, -1);
ASSERT_EQ(code, TSDB_CODE_WAL_INVALID_VER);
pWal->vers.lastVer = 50;
pWal->vers.commitVer = 40;
pWal->vers.snapshotVer = 40;
SWalFileInfo* fileInfo = walGetCurFileInfo(pWal);
code = walRollback(pWal, 48);
ASSERT_EQ(code, 0);
}
TEST_F(WalRetentionEnv, repairMeta1) {
walResetEnv();
int code;
@ -456,44 +710,6 @@ TEST_F(WalRetentionEnv, repairMeta1) {
walCloseReader(pRead);
}
class WalSkipLevel : public ::testing::Test {
protected:
static void SetUpTestCase() {
int code = walInit(NULL);
ASSERT(code == 0);
}
static void TearDownTestCase() { walCleanUp(); }
void walResetEnv() {
TearDown();
taosRemoveDir(pathName);
SetUp();
}
void SetUp() override {
SWalCfg cfg;
cfg.rollPeriod = -1;
cfg.segSize = -1;
cfg.committed =-1;
cfg.retentionPeriod = -1;
cfg.retentionSize = 0;
cfg.rollPeriod = 0;
cfg.vgId = 1;
cfg.level = TAOS_WAL_SKIP;
pWal = walOpen(pathName, &cfg);
ASSERT(pWal != NULL);
}
void TearDown() override {
walClose(pWal);
pWal = NULL;
}
SWal* pWal = NULL;
const char* pathName = TD_TMP_DIR_PATH "wal_test";
};
TEST_F(WalSkipLevel, restart) {
walResetEnv();
int code;
@ -533,4 +749,15 @@ TEST_F(WalSkipLevel, roll) {
ASSERT_EQ(code, 0);
code = walEndSnapshot(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalEncrypted, write) {
int code;
for (int i = 0; i < 100; i++) {
code = walAppendLog(pWal, i, i + 1, syncMeta, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
}
code = walSaveMeta(pWal);
ASSERT_EQ(code, 0);
}

View File

@ -454,7 +454,8 @@ struct tm *taosGmTimeR(const time_t *timep, struct tm *result) {
return NULL;
}
#ifdef WINDOWS
return gmtime_s(result, timep);
errno_t code = gmtime_s(result, timep);
return (code == 0) ? result : NULL;
#else
return gmtime_r(timep, result);
#endif

View File

@ -94,6 +94,35 @@ TEST(osTimeTests, taosLocalTime) {
#endif
}
TEST(osTimeTests, taosGmTimeR) {
// Test 1: Test when both timep and result are not NULL
time_t timep = 1617531000; // 2021-04-04 18:10:00
struct tm tmInfo;
ASSERT_NE(taosGmTimeR(&timep, &tmInfo), nullptr);
char buf[128];
taosStrfTime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &tmInfo);
ASSERT_STREQ(buf, "2021-04-04T10:10:00");
}
TEST(osTimeTests, taosTimeGm) {
char *timestr= "2021-04-04T18:10:00";
struct tm tm = {0};
taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
int64_t seconds = taosTimeGm(&tm);
ASSERT_EQ(seconds, 1617559800);
}
TEST(osTimeTests, taosMktime) {
char *timestr= "2021-04-04T18:10:00";
struct tm tm = {0};
taosStrpTime(timestr, "%Y-%m-%dT%H:%M:%S", &tm);
time_t seconds = taosMktime(&tm, NULL);
ASSERT_EQ(seconds, 1617531000);
}
TEST(osTimeTests, invalidParameter) {
void *retp = NULL;
int32_t reti = 0;

View File

@ -519,7 +519,7 @@ int32_t cfgSetItemVal(SConfigItem *pItem, const char *name, const char *value, E
int32_t code = TSDB_CODE_SUCCESS;
if (pItem == NULL) {
TAOS_RETURN(TSDB_CODE_INVALID_CFG);
TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND);
}
switch (pItem->dtype) {
case CFG_DTYPE_BOOL: {
@ -629,6 +629,7 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
cfgUnLock(pCfg);
TAOS_RETURN(code);
}
if ((pItem->category == CFG_CATEGORY_GLOBAL) && alterType == CFG_ALTER_DNODE) {
uError("failed to config:%s, not support update global config on only one dnode", name);
cfgUnLock(pCfg);

View File

@ -12,6 +12,10 @@
#include <gtest/gtest.h>
#include "tconfig.h"
#ifndef WINDOWS
#include "osFile.h"
#endif
class CfgTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {}
@ -35,6 +39,9 @@ TEST_F(CfgTest, 01_Str) {
EXPECT_STREQ(cfgStypeStr(CFG_STYPE_ENV_CMD), "env_cmd");
EXPECT_STREQ(cfgStypeStr(CFG_STYPE_APOLLO_URL), "apollo_url");
EXPECT_STREQ(cfgStypeStr(CFG_STYPE_ARG_LIST), "arg_list");
EXPECT_STREQ(cfgStypeStr(CFG_STYPE_TAOS_OPTIONS), "taos_options");
EXPECT_STREQ(cfgStypeStr(CFG_STYPE_ALTER_CLIENT_CMD), "alter_client_cmd");
EXPECT_STREQ(cfgStypeStr(CFG_STYPE_ALTER_SERVER_CMD), "alter_server_cmd");
EXPECT_STREQ(cfgStypeStr(ECfgSrcType(1024)), "invalid");
EXPECT_STREQ(cfgDtypeStr(CFG_DTYPE_NONE), "none");
@ -47,6 +54,10 @@ TEST_F(CfgTest, 01_Str) {
EXPECT_STREQ(cfgDtypeStr(CFG_DTYPE_DIR), "dir");
EXPECT_STREQ(cfgDtypeStr(CFG_DTYPE_DIR), "dir");
EXPECT_STREQ(cfgDtypeStr(CFG_DTYPE_DIR), "dir");
EXPECT_STREQ(cfgDtypeStr(CFG_DTYPE_DOUBLE), "double");
EXPECT_STREQ(cfgDtypeStr(CFG_DTYPE_LOCALE), "locale");
EXPECT_STREQ(cfgDtypeStr(CFG_DTYPE_CHARSET), "charset");
EXPECT_STREQ(cfgDtypeStr(CFG_DTYPE_TIMEZONE), "timezone");
EXPECT_STREQ(cfgDtypeStr(ECfgDataType(1024)), "invalid");
}
@ -57,24 +68,30 @@ TEST_F(CfgTest, 02_Basic) {
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 0, 0), 0);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0, 0, 0), 0);
EXPECT_EQ(cfgAddDir(pConfig, "test_dir", TD_TMP_DIR_PATH, 0, 0, 0), 0);
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 6, 0), 0);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 21, 0, 16, 0, 1, 0), TSDB_CODE_OUT_OF_RANGE);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0, 1, 0), 0);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 21, 0, 16, 0, 2, 0), TSDB_CODE_OUT_OF_RANGE);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0, 2, 0), 0);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 21, 0, 16, 0, 6, 0), TSDB_CODE_OUT_OF_RANGE);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0, 6, 0), 0);
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0, 6, 0), 0);
EXPECT_EQ(cfgAddDir(pConfig, "test_dir", TD_TMP_DIR_PATH, 0, 6, 0), 0);
EXPECT_EQ(cfgGetSize(pConfig), 6);
int32_t size = cfgGetSize(pConfig);
SConfigItem* pItem = NULL;
SConfigItem *pItem = NULL;
SConfigIter *pIter = NULL;
code = cfgCreateIter(pConfig, &pIter);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pIter, nullptr);
while((pItem = cfgNextIter(pIter)) != NULL) {
while ((pItem = cfgNextIter(pIter)) != NULL) {
switch (pItem->dtype) {
case CFG_DTYPE_BOOL:
printf("index:%d, cfg:%s value:%d\n", size, pItem->name, pItem->bval);
@ -115,12 +132,16 @@ TEST_F(CfgTest, 02_Basic) {
EXPECT_EQ(pItem->dtype, CFG_DTYPE_INT32);
EXPECT_STREQ(pItem->name, "test_int32");
EXPECT_EQ(pItem->i32, 1);
code = cfgSetItem(pConfig, "test_int32", "21", CFG_STYPE_DEFAULT, true);
ASSERT_EQ(code, TSDB_CODE_OUT_OF_RANGE);
pItem = cfgGetItem(pConfig, "test_int64");
EXPECT_EQ(pItem->stype, CFG_STYPE_DEFAULT);
EXPECT_EQ(pItem->dtype, CFG_DTYPE_INT64);
EXPECT_STREQ(pItem->name, "test_int64");
EXPECT_EQ(pItem->i64, 2);
code = cfgSetItem(pConfig, "test_int64", "21", CFG_STYPE_DEFAULT, true);
ASSERT_EQ(code, TSDB_CODE_OUT_OF_RANGE);
pItem = cfgGetItem(pConfig, "test_float");
EXPECT_EQ(pItem->stype, CFG_STYPE_DEFAULT);
@ -140,5 +161,213 @@ TEST_F(CfgTest, 02_Basic) {
EXPECT_STREQ(pItem->name, "test_dir");
EXPECT_STREQ(pItem->str, TD_TMP_DIR_PATH);
code = cfgGetAndSetItem(pConfig, &pItem, "err_cfg", "err_val", CFG_STYPE_DEFAULT, true);
ASSERT_EQ(code, TSDB_CODE_CFG_NOT_FOUND);
code = cfgCheckRangeForDynUpdate(pConfig, "test_int32", "4", false, CFG_ALTER_LOCAL);
ASSERT_EQ(code, TSDB_CODE_INVALID_CFG);
code = cfgCheckRangeForDynUpdate(pConfig, "test_int64", "4", true, CFG_ALTER_LOCAL);
ASSERT_EQ(code, TSDB_CODE_INVALID_CFG);
code = cfgCheckRangeForDynUpdate(pConfig, "test_bool", "3", false, CFG_ALTER_LOCAL);
ASSERT_EQ(code, TSDB_CODE_OUT_OF_RANGE);
code = cfgCheckRangeForDynUpdate(pConfig, "test_int32", "74", true, CFG_ALTER_LOCAL);
ASSERT_EQ(code, TSDB_CODE_OUT_OF_RANGE);
code = cfgCheckRangeForDynUpdate(pConfig, "test_int64", "74", false, CFG_ALTER_LOCAL);
ASSERT_EQ(code, TSDB_CODE_OUT_OF_RANGE);
code = cfgCheckRangeForDynUpdate(pConfig, "test_float", "74", false, CFG_ALTER_LOCAL);
ASSERT_EQ(code, TSDB_CODE_OUT_OF_RANGE);
cfgCleanup(pConfig);
}
TEST_F(CfgTest, initWithArray) {
SConfig *pConfig = NULL;
int32_t code = cfgInit(&pConfig);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 0, 0), 0);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0, 0, 0), 0);
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0, 0, 0), 0);
EXPECT_EQ(cfgAddDir(pConfig, "test_dir", TD_TMP_DIR_PATH, 0, 0, 0), 0);
SArray *pArgs = taosArrayInit(6, sizeof(SConfigPair));
SConfigPair *pPair = (SConfigPair *)taosMemoryMalloc(sizeof(SConfigPair));
pPair->name = "test_bool";
pPair->value = "1";
taosArrayPush(pArgs, pPair);
SConfigPair *pPair1 = (SConfigPair *)taosMemoryMalloc(sizeof(SConfigPair));
pPair1->name = "test_int32";
pPair1->value = "2";
taosArrayPush(pArgs, pPair1);
SConfigPair *pPair2 = (SConfigPair *)taosMemoryMalloc(sizeof(SConfigPair));
pPair2->name = "test_int64";
pPair2->value = "3";
taosArrayPush(pArgs, pPair2);
SConfigPair *pPair3 = (SConfigPair *)taosMemoryMalloc(sizeof(SConfigPair));
pPair3->name = "test_float";
pPair3->value = "4";
taosArrayPush(pArgs, pPair3);
SConfigPair *pPair4 = (SConfigPair *)taosMemoryMalloc(sizeof(SConfigPair));
pPair4->name = "test_string";
pPair4->value = "5";
taosArrayPush(pArgs, pPair4);
SConfigPair *pPair5 = (SConfigPair *)taosMemoryMalloc(sizeof(SConfigPair));
pPair5->name = "test_dir";
pPair5->value = TD_TMP_DIR_PATH;
taosArrayPush(pArgs, pPair5);
code = cfgLoadFromArray(pConfig, pArgs);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
}
TEST_F(CfgTest, cfgDumpItemCategory) {
SConfig *pConfig = NULL;
int32_t code = cfgInit(&pConfig);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 6, 100), 0);
SConfigItem *pItem = NULL;
pItem = cfgGetItem(pConfig, "test_bool");
EXPECT_EQ(pItem->stype, CFG_STYPE_DEFAULT);
EXPECT_EQ(pItem->dtype, CFG_DTYPE_BOOL);
EXPECT_STREQ(pItem->name, "test_bool");
EXPECT_EQ(pItem->bval, 0);
EXPECT_EQ(cfgDumpItemCategory(pItem, NULL, 0, 0), TSDB_CODE_INVALID_CFG);
}
TEST_F(CfgTest, cfgDumpCfgS3) {
SConfig *pConfig = NULL;
int32_t code = cfgInit(&pConfig);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
cfgAddInt32(pConfig, "s3MigrateIntervalSec", 60 * 60, 600, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,
CFG_CATEGORY_GLOBAL);
cfgAddBool(pConfig, "s3MigrateEnabled", 60 * 60, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER, CFG_CATEGORY_GLOBAL);
cfgAddString(pConfig, "s3Accesskey", "", CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER_LAZY, CFG_CATEGORY_GLOBAL);
cfgAddString(pConfig, "s3Endpoint", "", CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER_LAZY, CFG_CATEGORY_GLOBAL);
cfgAddString(pConfig, "s3BucketName", "", CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER_LAZY, CFG_CATEGORY_GLOBAL);
cfgAddInt32(pConfig, "s3PageCacheSize", 10, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER_LAZY,
CFG_CATEGORY_GLOBAL);
cfgAddInt32(pConfig, "s3UploadDelaySec", 10, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,
CFG_CATEGORY_GLOBAL);
cfgAddDir(pConfig, "scriptDir", configDir, CFG_SCOPE_BOTH, CFG_DYN_NONE, CFG_CATEGORY_LOCAL);
cfgDumpCfgS3(pConfig, false, false);
cfgDumpCfgS3(pConfig, true, true);
cfgDumpCfgS3(pConfig, false, true);
cfgDumpCfgS3(pConfig, true, false);
}
#ifndef WINDOWS
TEST_F(CfgTest, cfgLoadFromEnvVar) {
SConfig *pConfig = NULL;
int32_t code = cfgInit(&pConfig);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 6, 0), 0);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 21, 0, 16, 0, 1, 0), TSDB_CODE_OUT_OF_RANGE);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0, 1, 0), 0);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 21, 0, 16, 0, 2, 0), TSDB_CODE_OUT_OF_RANGE);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0, 2, 0), 0);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 21, 0, 16, 0, 6, 0), TSDB_CODE_OUT_OF_RANGE);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0, 6, 0), 0);
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0, 6, 0), 0);
EXPECT_EQ(cfgAddDir(pConfig, "test_dir", TD_TMP_DIR_PATH, 0, 6, 0), 0);
setenv("test_bool", "1", 1);
setenv("test_int32", "2", 1);
setenv("test_int64", "3", 1);
setenv("test_float", "4", 1);
setenv("test_string", "5", 1);
setenv("test_dir", TD_TMP_DIR_PATH, 1);
ASSERT_EQ(cfgLoad(pConfig, CFG_STYPE_ENV_VAR, "test_bool"), TSDB_CODE_SUCCESS);
}
TEST_F(CfgTest, cfgLoadFromEnvCmd) {
SConfig *pConfig = NULL;
int32_t code = cfgInit(&pConfig);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
EXPECT_EQ(cfgAddBool(pConfig, "test_bool", 0, 0, 6, 0), 0);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 21, 0, 16, 0, 1, 0), TSDB_CODE_OUT_OF_RANGE);
EXPECT_EQ(cfgAddInt32(pConfig, "test_int32", 1, 0, 16, 0, 1, 0), 0);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 21, 0, 16, 0, 2, 0), TSDB_CODE_OUT_OF_RANGE);
EXPECT_EQ(cfgAddInt64(pConfig, "test_int64", 2, 0, 16, 0, 2, 0), 0);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 21, 0, 16, 0, 6, 0), TSDB_CODE_OUT_OF_RANGE);
EXPECT_EQ(cfgAddFloat(pConfig, "test_float", 3, 0, 16, 0, 6, 0), 0);
EXPECT_EQ(cfgAddString(pConfig, "test_string", "4", 0, 6, 0), 0);
const char *envCmd[] = {"test_bool=1", "test_int32=2", "test_int64=3", "test_float=4", "test_string=5", NULL};
ASSERT_EQ(cfgLoad(pConfig, CFG_STYPE_ENV_CMD, envCmd), TSDB_CODE_SUCCESS);
}
TEST_F(CfgTest, cfgLoadFromEnvFile) {
SConfig *pConfig = NULL;
int32_t code = cfgInit(&pConfig);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
TdFilePtr envFile = NULL;
const char *envFilePath = TD_TMP_DIR_PATH "envFile";
envFile = taosOpenFile(envFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
const char *buf = "test_bool=1\ntest_int32=2\ntest_int64=3\ntest_float=4\ntest_string=5\n";
taosWriteFile(envFile, buf, strlen(buf));
taosCloseFile(&envFile);
ASSERT_EQ(cfgLoad(pConfig, CFG_STYPE_ENV_FILE, envFilePath), TSDB_CODE_SUCCESS);
taosRemoveFile(envFilePath);
}
TEST_F(CfgTest, cfgLoadFromApollUrl) {
SConfig *pConfig = NULL;
int32_t code = cfgInit(&pConfig);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
ASSERT_NE(pConfig, nullptr);
TdFilePtr jsonFile = NULL;
const char *jsonFilePath = TD_TMP_DIR_PATH "envJson.json";
jsonFile = taosOpenFile(jsonFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
const char *buf =
"{\"test_bool\":\"1\",\"test_int32\":\"2\",\"test_int64\":\"3\",\"test_float\":\"4\",\"test_string\":\"5\"}";
taosWriteFile(jsonFile, buf, strlen(buf));
taosCloseFile(&jsonFile);
char str[256];
snprintf(str, sizeof(str), "jsonFile:%s", jsonFilePath);
ASSERT_EQ(cfgLoad(pConfig, CFG_STYPE_APOLLO_URL, str), 0);
taosRemoveFile(jsonFilePath);
}
#endif

View File

@ -2048,7 +2048,252 @@ TEST(DisablePoolFuncTest, MultiThreadTest) {
}
#endif
#if 1
TEST(functionsTest, internalFunc) {
char* caseName = "functionsTest:internalFunc";
int32_t code = 0;
int64_t msize = 10;
void* pSession = NULL;
void* pJob = NULL;
mptInitPool();
memset(mptCtx.jobCtxs, 0, sizeof(*mptCtx.jobCtxs));
assert(0 == taosMemPoolCallocJob(0, 0, (void**)&pJob));
assert(0 == taosMemPoolInitSession(gMemPoolHandle, &pSession, pJob, "id"));
int32_t loopTimes = 1;
int64_t st = 0;
void **addrList = (void**)taosMemCalloc(loopTimes, POINTER_BYTES);
// MALLOC
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryMalloc(msize);
}
mptFreeAddrList(addrList, loopTimes);
tsMemPoolFullFunc = 0;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryMalloc(msize);
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
tsMemPoolFullFunc = 1;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryMalloc(msize);
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
// CALLOC
tsMemPoolFullFunc = 0;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryCalloc(1, msize);
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
tsMemPoolFullFunc = 1;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryCalloc(1, msize);
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryCalloc(1, msize);
}
//mptFreeAddrList(addrList, loopTimes); NO FREE FOR REALLOC
// REALLOC
tsMemPoolFullFunc = 0;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryRealloc(addrList[i], msize);
}
mptDisableMemoryPoolUsage();
tsMemPoolFullFunc = 1;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryRealloc(addrList[i], msize);
}
mptDisableMemoryPoolUsage();
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryRealloc(addrList[i], msize);
}
mptFreeAddrList(addrList, loopTimes);
// STRDUP
tsMemPoolFullFunc = 0;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptStrdup("abc");
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
tsMemPoolFullFunc = 1;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptStrdup("abc");
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptStrdup("abc");
}
mptFreeAddrList(addrList, loopTimes);
// STRNDUP
tsMemPoolFullFunc = 0;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptStrndup("abc", 3);
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
tsMemPoolFullFunc = 1;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptStrndup("abc", 3);
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptStrndup("abc", 3);
}
mptFreeAddrList(addrList, loopTimes);
// ALIGNALLOC
tsMemPoolFullFunc = 0;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryMallocAlign(8, msize);
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
tsMemPoolFullFunc = 1;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryMallocAlign(8, msize);
}
mptDisableMemoryPoolUsage();
mptFreeAddrList(addrList, loopTimes);
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryMallocAlign(8, msize);
}
//mptFreeAddrList(addrList, loopTimes); NO FREE FOR GETSIZE
// GETSIZE
tsMemPoolFullFunc = 0;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
mptMemorySize(addrList[i]);
}
mptDisableMemoryPoolUsage();
tsMemPoolFullFunc = 1;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
mptMemorySize(addrList[i]);
}
mptDisableMemoryPoolUsage();
for (int32_t i = 0; i < loopTimes; ++i) {
mptMemorySize(addrList[i]);
}
// FREE
tsMemPoolFullFunc = 0;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
mptMemoryFree(addrList[i]);
}
mptDisableMemoryPoolUsage();
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryMalloc(msize);
}
tsMemPoolFullFunc = 1;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
mptMemoryFree(addrList[i]);
}
mptDisableMemoryPoolUsage();
for (int32_t i = 0; i < loopTimes; ++i) {
addrList[i] = (char*)mptMemoryMalloc(msize);
}
for (int32_t i = 0; i < loopTimes; ++i) {
mptMemoryFree(addrList[i]);
}
// TRIM
bool trimed = false;
tsMemPoolFullFunc = 0;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
mptMemoryTrim(0, NULL);
mptMemoryTrim(0, &trimed);
}
mptDisableMemoryPoolUsage();
tsMemPoolFullFunc = 1;
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) {
mptMemoryTrim(0, NULL);
mptMemoryTrim(0, &trimed);
}
mptDisableMemoryPoolUsage();
}
#endif
#endif

View File

@ -0,0 +1,81 @@
import taos
import sys
import os
import subprocess
import glob
import shutil
import time
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.srvCtl import *
from frame.caseBase import *
from frame import *
from frame.autogen import *
from frame import epath
# from frame.server.dnodes import *
# from frame.server.cluster import *
class TDTestCase(TBase):
def init(self, conn, logSql, replicaVar=1):
updatecfgDict = {'dDebugFlag':131}
super(TDTestCase, self).init(conn, logSql, replicaVar=1, checkColName="c1")
self.valgrind = 0
self.db = "test"
self.stb = "meters"
self.childtable_count = 10
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;')
time.sleep(1)
count = 0
while count < 100:
tdSql.query("show arbgroups;")
if tdSql.getData(0, 4) == 1:
break
tdLog.info("wait 1 seconds for is sync")
time.sleep(1)
count += 1
tdSql.query("show db.vgroups;")
if(tdSql.getData(0, 4) == "follower") and (tdSql.getData(0, 6) == "leader"):
tdLog.info("stop dnode2")
sc.dnodeStop(2)
if(tdSql.getData(0, 6) == "follower") and (tdSql.getData(0, 4) == "leader"):
tdLog.info("stop dnode 3")
sc.dnodeStop(3)
count = 0
while count < 100:
tdSql.query("show db.vgroups;")
if(tdSql.getData(0, 4) == "assigned ") or (tdSql.getData(0, 6) == "assigned "):
break
tdLog.info("wait 1 seconds for set assigned")
time.sleep(1)
count += 1
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -11,6 +11,7 @@
#
,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f db-encrypt/basic.py -N 3 -M 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/arbitrator.py -N 3
,,n,army,python3 ./test.py -f storage/s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py
@ -365,6 +366,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/telemetry.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/backquote_check.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosdMonitor.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosdNewMonitor.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosd_audit.py
,,n,system-test,python3 ./test.py -f 0-others/taosdlog.py
,,n,system-test,python3 ./test.py -f 0-others/taosdShell.py -N 5 -M 3 -Q 3
@ -407,8 +409,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/persisit_config.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/qmemCtrl.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compact_vgroups.py
,,n,system-test,python3 ./test.py -f 0-others/dumpsdb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compact.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_create.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_delete.py
@ -782,6 +786,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys.py -N 4 -M 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/compactDBConflict.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/mnodeEncrypt.py 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -Q 2

View File

@ -0,0 +1,98 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def caseDescription(self):
"""
dump sdb taosd -s
"""
def init(self, conn, logSql, replicaVar=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tmpdir = "tmp"
def getPath(self, tool="taosd"):
if (platform.system().lower() == 'windows'):
tool = tool + ".exe"
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath):
if ((tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
paths.append(os.path.join(root, tool))
break
if (len(paths) == 0):
tdLog.exit("%s not found!"%tool)
return
else:
tdLog.info("%s found in %s" % (tool, paths[0]))
return paths[0]
def run(self):
tdSql.execute("create database db keep 3649 ")
tdSql.execute("use db")
tdSql.execute(
"create table st(ts timestamp, c1 INT, c2 BOOL, c3 TINYINT, c4 SMALLINT, c5 BIGINT, c6 FLOAT, c7 DOUBLE, c8 TIMESTAMP, c9 BINARY(10), c10 NCHAR(10), c11 TINYINT UNSIGNED, c12 SMALLINT UNSIGNED, c13 INT UNSIGNED, c14 BIGINT UNSIGNED) tags(n1 INT, w2 BOOL, t3 TINYINT, t4 SMALLINT, t5 BIGINT, t6 FLOAT, t7 DOUBLE, t8 TIMESTAMP, t9 BINARY(10), t10 NCHAR(10), t11 TINYINT UNSIGNED, t12 SMALLINT UNSIGNED, t13 INT UNSIGNED, t14 BIGINT UNSIGNED)"
)
tdSql.execute(
"create table t1 using st tags(1, true, 1, 1, 1, 1.0, 1.0, 1, '1', '', 1, 1, 1, 1)"
)
tdSql.execute(
"insert into t1 values(1640000000000, 1, true, 1, 1, 1, 1.0, 1.0, 1, '1', '', 1, 1, 1, 1)"
)
tdSql.execute(
"create table t2 using st tags(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)"
)
tdSql.execute(
"insert into t2 values(1640000000000, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)"
)
# sys.exit(1)
binPath = self.getPath()
if binPath == "":
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % binPath)
if os.path.exists("sdb.json"):
os.system("rm -f sdb.json")
os.system("%s -s" % binPath)
if not os.path.exists("sdb.json"):
tdLog.exit("taosd -s failed!")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -0,0 +1,243 @@
import taos
import sys
import time
import socket
# import pexpect
import os
import http.server
import gzip
import threading
import json
import pickle
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
telemetryPort = '6043'
serverPort = '7080'
hostname = socket.gethostname()
class RequestHandlerImpl(http.server.BaseHTTPRequestHandler):
hostPort = hostname + ":" + serverPort
def telemetryInfoCheck(self, infoDict=''):
if len(infoDict) == 0:
return
if "ts" not in infoDict[0] or len(infoDict[0]["ts"]) == 0:
tdLog.exit("ts is null!")
if "protocol" not in infoDict[0] or infoDict[0]["protocol"] != 2:
tdLog.exit("protocol is null!")
if "tables" not in infoDict[0]:
tdLog.exit("tables is null!")
if infoDict[0]["tables"][0]["name"] != "taosd_dnodes_info":
tdLog.exit("taosd_dnodes_info is null!")
# dnode_info ====================================
dnode_infos = ['disk_engine', 'system_net_in', 'vnodes_num', 'system_net_out', 'uptime', 'has_mnode', 'io_read_disk', 'error_log_count',
'io_read', 'cpu_cores', 'has_qnode', 'has_snode', 'disk_total', 'mem_engine', 'info_log_count', 'cpu_engine', 'io_write_disk',
'debug_log_count', 'disk_used', 'mem_total', 'io_write', 'masters', 'cpu_system',
'trace_log_count', 'mem_free']
index = 0
for elem in dnode_infos:
tdLog.debug(f"elem: {index},{elem}")
if infoDict[0]["tables"][0]["metric_groups"][0]["metrics"][index]["name"] != elem:
tdLog.exit(f"{elem} is null!")
index += 1
if infoDict[0]["tables"][1]["name"] != "taosd_dnodes_log_dirs":
tdLog.exit("taosd_dnodes_log_dirs is null!")
# logdir
if infoDict[0]["tables"][1]["metric_groups"][0]["tags"][3]["name"] != "data_dir_name":
tdLog.exit("data_dir_name is null!")
if infoDict[0]["tables"][1]["metric_groups"][0]["metrics"][0]["name"] != "total":
tdLog.exit("total is null!")
if infoDict[0]["tables"][1]["metric_groups"][0]["metrics"][1]["name"] != "used":
tdLog.exit("used is null!")
if infoDict[0]["tables"][1]["metric_groups"][0]["metrics"][2]["name"] != "avail":
tdLog.exit("avail is null!")
if infoDict[0]["tables"][2]["name"] != "taosd_dnodes_data_dirs":
tdLog.exit("taosd_dnodes_data_dirs is null!")
# data_dir
if infoDict[0]["tables"][2]["metric_groups"][0]["tags"][3]["name"] != "data_dir_name":
tdLog.exit("data_dir_name is null!")
if infoDict[0]["tables"][2]["metric_groups"][0]["metrics"][0]["name"] != "avail":
tdLog.exit("total is null!")
if infoDict[0]["tables"][2]["metric_groups"][0]["metrics"][1]["name"] != "total":
tdLog.exit("used is null!")
if infoDict[0]["tables"][2]["metric_groups"][0]["metrics"][2]["name"] != "used":
tdLog.exit("avail is null!")
if infoDict[0]["tables"][3]["name"] != "taosd_cluster_info":
tdLog.exit("taosd_cluster_info is null!")
# cluster_info ====================================
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][0]["name"] != "cluster_uptime":
tdLog.exit("cluster_uptime is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][1]["name"] != "dbs_total":
tdLog.exit("dbs_total is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][4]["name"] != "vgroups_total":
tdLog.exit("vgroups_total is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][5]["name"] != "vgroups_alive":
tdLog.exit("vgroups_alive is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][10]["name"] != "connections_total":
tdLog.exit("connections_total is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][13]["name"] != "dnodes_total":
tdLog.exit("dnodes_total is null!")
# grant_info ====================================
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][15]["name"] != "grants_expire_time":
tdLog.exit("grants_expire_time is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][16]["name"] != "grants_timeseries_used":
tdLog.exit("grants_timeseries_used is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][17]["name"] != "grants_timeseries_total":
tdLog.exit("grants_timeseries_total is null!")
# vgroup_infos ====================================
vgroup_infos_nums = len(infoDict[0]["tables"][4]["metric_groups"])
for index in range(vgroup_infos_nums):
if infoDict[0]["tables"][4]["metric_groups"][index]["metrics"][0]["name"] != "tables_num":
tdLog.exit("tables_num is null!")
if infoDict[0]["tables"][4]["metric_groups"][index]["metrics"][1]["name"] != "status":
tdLog.exit("status is null!")
if infoDict[0]["tables"][5]["name"] != "taosd_dnodes_status":
tdLog.exit("taosd_dnodes_status is null!")
if infoDict[0]["tables"][6]["name"] != "taosd_mnodes_info":
tdLog.exit("taosd_mnodes_info is null!")
if infoDict[0]["tables"][7]["name"] != "taosd_vnodes_info":
tdLog.exit("taosd_vnodes_info is null!")
def do_GET(self):
"""
process GET request
"""
def do_POST(self):
"""
process POST request
"""
contentEncoding = self.headers["Content-Encoding"]
if contentEncoding == 'gzip':
req_body = self.rfile.read(int(self.headers["Content-Length"]))
plainText = gzip.decompress(req_body).decode()
else:
plainText = self.rfile.read(int(self.headers["Content-Length"])).decode()
print(plainText)
# 1. send response code and header
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
# 2. send response content
#self.wfile.write(("Hello World: " + req_body + "\n").encode("utf-8"))
# 3. check request body info
infoDict = json.loads(plainText)
#print("================")
# print(infoDict)
self.telemetryInfoCheck(infoDict)
# 4. shutdown the server and exit case
assassin = threading.Thread(target=self.server.shutdown)
assassin.daemon = True
assassin.start()
print ("==== shutdown http server ====")
class TDTestCase:
global hostname
global serverPort
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
try:
config = eval(tdDnodes.dnodes[0].remoteIP )
hostname = config["host"]
except Exception:
hostname = tdDnodes.dnodes[0].remoteIP
rpcDebugFlagVal = '143'
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
clientCfgDict["serverPort"] = serverPort
clientCfgDict["firstEp"] = hostname + ':' + serverPort
clientCfgDict["secondEp"] = hostname + ':' + serverPort
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
clientCfgDict["fqdn"] = hostname
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
updatecfgDict["clientCfg"] = clientCfgDict
updatecfgDict["serverPort"] = serverPort
updatecfgDict["firstEp"] = hostname + ':' + serverPort
updatecfgDict["secondEp"] = hostname + ':' + serverPort
updatecfgDict["fqdn"] = hostname
updatecfgDict["monitorFqdn"] = hostname
updatecfgDict["monitorPort"] = '6043'
updatecfgDict["monitor"] = '1'
updatecfgDict["monitorInterval"] = "5"
updatecfgDict["monitorMaxLogs"] = "10"
updatecfgDict["monitorComp"] = "1"
updatecfgDict["monitorForceV2"] = "1"
updatecfgDict["audit"] = '0'
print ("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
# time.sleep(2)
vgroups = "4"
sql = "create database db3 vgroups " + vgroups
tdSql.query(sql)
sql = "create table db3.stb (ts timestamp, f int) tags (t int)"
tdSql.query(sql)
sql = "create table db3.tb using db3.stb tags (1)"
tdSql.query(sql)
# create http server: bing ip/port , and request processor
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
RequestHandlerImplStr = base64.b64encode(pickle.dumps(RequestHandlerImpl)).decode()
cmdStr = "import pickle\nimport http\nRequestHandlerImpl=pickle.loads(base64.b64decode(\"%s\".encode()))\nclass NewRequestHandlerImpl(RequestHandlerImpl):\n hostPort = \'%s\'\nhttp.server.HTTPServer((\"\", %s), NewRequestHandlerImpl).serve_forever()"%(RequestHandlerImplStr,hostname+":"+serverPort,telemetryPort)
tdDnodes.dnodes[0].remoteExec({}, cmdStr)
else:
serverAddress = ("", int(telemetryPort))
http.server.HTTPServer(serverAddress, RequestHandlerImpl).serve_forever()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,69 @@
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
from numpy import row_stack
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
import time
import socket
import subprocess
from multiprocessing import Process
import threading
import time
import inspect
import ctypes
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
tdSql.init(conn.cursor())
self.host = socket.gethostname()
def run(self):
tdSql.execute('create database if not exists db');
tdSql.execute('use db')
tdSql.execute('create table st (ts timestamp, i int, j float, k double) tags(a int)')
for i in range(0, 2):
tdSql.execute("create table if not exists db.t%d using db.st tags(%d)" % (i, i))
for i in range(2, 4):
tdSql.execute("create table if not exists db.t%d using db.st tags(%d)" % (i, i))
sql = "show db.tables"
tdSql.query(sql)
tdSql.checkRows(4)
timestamp = 1530374400000
for i in range (4) :
val = i
sql = "insert into db.t%d values(%d, %d, %d, %d)" % (i, timestamp, val, val, val)
tdSql.execute(sql)
for i in range ( 4) :
val = i
sql = "select * from db.t%d" % (i)
tdSql.query(sql)
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,50 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
from util.cases import *
from util.sql import *
from util.dnodes import *
from util.log import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to init {__file__}")
self.replicaVar = int(replicaVar)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;')
time.sleep(1)
tdSql.query("show db.vgroups;")
if(tdSql.queryResult[0][4] == "follower") and (tdSql.queryResult[0][6] == "leader"):
tdLog.info("stop dnode2")
sc.dnodeStop(2)
if(tdSql.queryResult[0][6] == "follower") and (tdSql.queryResult[0][4] == "leader"):
tdLog.info("stop dnode 3")
sc.dnodeStop(3)
tdLog.info("wait 10 seconds")
time.sleep(10)
tdSql.query("show db.vgroups;")
if(tdSql.queryResult[0][4] != "assigned") and (tdSql.queryResult[0][6] != "assigned"):
tdLog.exit("failed to set aasigned")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -70,10 +70,10 @@ require (
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect

View File

@ -424,8 +424,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -573,8 +573,8 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -583,8 +583,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=