Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/submit_req

This commit is contained in:
Hongze Cheng 2022-11-30 19:28:52 +08:00
commit 4b5671a631
25 changed files with 838 additions and 277 deletions

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG cf30c86
GIT_TAG b103d9b
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -69,7 +69,7 @@ TDengine 的主要功能如下:
- **[分析能力](https://www.taosdata.com/tdengine/easy_data_analytics)**通过超级表、存储计算分离、分区分片、预计算和其它技术TDengine 能够高效地浏览、格式化和访问数据。
- **[核心开源](https://www.taosdata.com/tdengine/open_source_time-series_database)**TDengine 的核心代码包括集群功能全部在开源协议下公开。全球超过 140k 个运行实例GitHub Star 19k且拥有一个活跃的开发者社区。
- **[核心开源](https://www.taosdata.com/tdengine/open_source_time-series_database)**TDengine 的核心代码包括集群功能全部在开源协议下公开。全球超过 140k 个运行实例GitHub Star 20k且拥有一个活跃的开发者社区。
采用 TDengine可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。表现在几个方面

View File

@ -67,6 +67,7 @@ typedef struct SWal SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;
typedef enum {
TAOS_SYNC_STATE_OFFLINE = 0,
TAOS_SYNC_STATE_FOLLOWER = 100,
TAOS_SYNC_STATE_CANDIDATE = 101,
TAOS_SYNC_STATE_LEADER = 102,

View File

@ -169,6 +169,9 @@ void taosSetMaskSIGPIPE();
uint32_t taosInetAddr(const char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len);
uint64_t taosHton64(uint64_t val);
uint64_t taosNtoh64(uint64_t val);
#ifdef __cplusplus
}
#endif

View File

@ -206,16 +206,15 @@ static const SSysDbTableSchema vgroupsSchema[] = {
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "v1_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "v1_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "v2_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "v2_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "v3_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "v3_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "v1_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v1_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "v2_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v2_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "v3_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v3_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "nfiles", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
};

View File

@ -150,7 +150,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
SServerStatusRsp statusRsp = {0};
SMonMloadInfo minfo = {0};
(*pMgmt->getMnodeLoadsFp)(&minfo);
if (minfo.isMnode && minfo.load.syncState == TAOS_SYNC_STATE_ERROR) {
if (minfo.isMnode && (minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
return;
@ -160,7 +160,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
(*pMgmt->getVnodeLoadsFp)(&vinfo);
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
if (pLoad->syncState == TAOS_SYNC_STATE_ERROR) {
if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId,
syncStr(pLoad->syncState));

View File

@ -36,8 +36,6 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
int32_t mndAddVnodeToVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray);
int32_t mndRemoveVnodeFromVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid);
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);

View File

@ -151,10 +151,10 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
bool roleChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
if (pVgroup->vnodeGid[vg].dnodeId == dnodeId) {
if (pVgroup->vnodeGid[vg].syncState != TAOS_SYNC_STATE_ERROR) {
if (pVgroup->vnodeGid[vg].syncState != TAOS_SYNC_STATE_OFFLINE) {
mInfo("vgId:%d, state changed by offline check, old state:%s restored:%d new state:error restored:0",
pVgroup->vgId, syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore);
pVgroup->vnodeGid[vg].syncState = TAOS_SYNC_STATE_ERROR;
pVgroup->vnodeGid[vg].syncState = TAOS_SYNC_STATE_OFFLINE;
pVgroup->vnodeGid[vg].syncRestore = 0;
roleChanged = true;
}
@ -491,6 +491,15 @@ void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) {
syncLeaderTransfer(pMnode->syncMgmt.sync);
syncPreStop(pMnode->syncMgmt.sync);
while (syncSnapshotRecving(pMnode->syncMgmt.sync)) {
mInfo("vgId:1, snapshot is recving");
taosMsleep(300);
}
while (syncSnapshotSending(pMnode->syncMgmt.sync)) {
mInfo("vgId:1, snapshot is sending");
taosMsleep(300);
}
}
}
@ -747,7 +756,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
tstrncpy(desc.status, "ready", sizeof(desc.status));
pClusterInfo->vgroups_alive++;
}
if (pVgid->syncState != TAOS_SYNC_STATE_ERROR) {
if (pVgid->syncState != TAOS_SYNC_STATE_ERROR && pVgid->syncState != TAOS_SYNC_STATE_OFFLINE) {
pClusterInfo->vnodes_alive++;
}
pClusterInfo->vnodes_total++;

View File

@ -185,7 +185,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
return -1;
}
pObj->syncState = TAOS_SYNC_STATE_ERROR;
pObj->syncState = TAOS_SYNC_STATE_OFFLINE;
mndReloadSyncConfig(pSdb->pMnode);
return 0;
}

View File

@ -179,6 +179,22 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
pOld->hashEnd = pNew->hashEnd;
pOld->replica = pNew->replica;
pOld->isTsma = pNew->isTsma;
for (int32_t i = 0; i < pNew->replica; ++i) {
SVnodeGid *pNewGid = &pNew->vnodeGid[i];
for (int32_t j = 0; j < pOld->replica; ++j) {
SVnodeGid *pOldGid = &pOld->vnodeGid[j];
if (pNewGid->dnodeId == pOldGid->dnodeId) {
pNewGid->syncState = pOldGid->syncState;
pNewGid->syncRestore = pOldGid->syncRestore;
}
}
}
pNew->numOfTables = pOld->numOfTables;
pNew->numOfTimeSeries = pOld->numOfTimeSeries;
pNew->totalStorage = pOld->totalStorage;
pNew->compStorage = pOld->compStorage;
pNew->pointsWritten = pOld->pointsWritten;
pNew->compact = pOld->compact;
memcpy(pOld->vnodeGid, pNew->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
return 0;
}
@ -659,11 +675,12 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
// default 3 replica
for (int32_t i = 0; i < 3; ++i) {
// default 3 replica, add 1 replica if move vnode
for (int32_t i = 0; i < 4; ++i) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (i < pVgroup->replica) {
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false);
int16_t dnodeId = (int16_t)pVgroup->vnodeGid[i].dnodeId;
colDataAppend(pColInfo, numOfRows, (const char *)&dnodeId, false);
bool exist = false;
bool online = false;
@ -695,16 +712,8 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppendNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->cacheUsage, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppendNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppendNULL(pColInfo, numOfRows);
int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
colDataAppend(pColInfo, numOfRows, (const char *)&cacheUsage, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
@ -851,7 +860,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray) {
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SDnodeObj *pDnode = taosArrayGet(pArray, i);
@ -887,12 +896,21 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
}
pVgid->dnodeId = pDnode->id;
pVgid->syncState = TAOS_SYNC_STATE_ERROR;
pVgid->syncState = TAOS_SYNC_STATE_OFFLINE;
mInfo("db:%s, vgId:%d, vn:%d is added, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, pVgroup->replica, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
pVgroup->replica++;
pDnode->numOfVnodes++;
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
return 0;
}
@ -901,7 +919,8 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
return -1;
}
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid) {
static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
SVnodeGid *pDelVgid) {
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SDnodeObj *pDnode = taosArrayGet(pArray, i);
@ -941,6 +960,15 @@ _OVER:
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
}
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
return 0;
}
@ -1088,7 +1116,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
if (!force) {
mInfo("vgId:%d, will add 1 vnode", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
for (int32_t i = 0; i < newVg.replica - 1; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
@ -1100,6 +1128,16 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
}
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
for (int32_t i = 0; i < newVg.replica; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
@ -1107,11 +1145,20 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
} else {
mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
newVg.replica--;
SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
}
for (int32_t i = 0; i < newVg.replica; ++i) {
if (i != vnIndex) {
@ -1128,16 +1175,12 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL || mndTransAppendRedolog(pTrans, pRaw) != 0) return -1;
if (pRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
}
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) return -1;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
}
mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
@ -1193,7 +1236,15 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
pVgroup->replica++;
pGid->dnodeId = newDnodeId;
pGid->syncState = TAOS_SYNC_STATE_ERROR;
pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
for (int32_t i = 0; i < pVgroup->replica - 1; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId) != 0) return -1;
@ -1224,6 +1275,14 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, pVgroup, pVgroup->vnodeGid[i].dnodeId) != 0) return -1;
@ -1236,9 +1295,8 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
SDnodeObj *pOld3) {
int32_t code = -1;
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
int32_t code = -1;
STrans *pTrans = NULL;
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "red-vgroup");
if (pTrans == NULL) goto _OVER;
@ -1319,17 +1377,13 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
}
{
pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL || mndTransAppendRedolog(pTrans, pRaw) != 0) goto _OVER;
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
}
{
pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
}
mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
@ -1342,7 +1396,6 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
_OVER:
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
mndReleaseDb(pMnode, pDb);
return code;
}
@ -1593,13 +1646,13 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId) != 0)
@ -1612,7 +1665,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
SVnodeGid del1 = {0};
SVnodeGid del2 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
if (mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVgroup, pArray, &del1) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
@ -1620,7 +1673,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
if (mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVgroup, pArray, &del2) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
@ -1629,16 +1682,6 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
return -1;
}
{
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
}
{
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
@ -1657,10 +1700,9 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
}
static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = -1;
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
int32_t code = -1;
STrans *pTrans = NULL;
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "split-vgroup");
if (pTrans == NULL) goto _OVER;
@ -1676,13 +1718,13 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
}
if (newVg1.replica == 1) {
if (mndAddVnodeToVgroup(pMnode, &newVg1, pArray) != 0) goto _OVER;
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray) != 0) goto _OVER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId) != 0) goto _OVER;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
} else if (newVg1.replica == 3) {
SVnodeGid del1 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVg1, pArray, &del1) != 0) goto _OVER;
if (mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1) != 0) goto _OVER;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId) != 0) goto _OVER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId) != 0) goto _OVER;
@ -1727,17 +1769,23 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
#endif
{
pRaw = mndVgroupActionEncode(&newVg1);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg1);
if (pRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
}
{
pRaw = mndVgroupActionEncode(&newVg2);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg2);
if (pRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
}
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId,
@ -1757,7 +1805,6 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
_OVER:
taosArrayDestroy(pArray);
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
return code;
}
@ -1802,16 +1849,8 @@ static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SD
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL || mndTransAppendRedolog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
}
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) {
if (pRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}

View File

@ -775,7 +775,10 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
} else {
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
}
colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
}
} else {

View File

@ -515,6 +515,16 @@ void vnodeSyncPreClose(SVnode *pVnode) {
vInfo("vgId:%d, pre close sync", pVnode->config.vgId);
syncLeaderTransfer(pVnode->sync);
syncPreStop(pVnode->sync);
while (syncSnapshotRecving(pVnode->sync)) {
vInfo("vgId:%d, snapshot is recving", pVnode->config.vgId);
taosMsleep(300);
}
while (syncSnapshotSending(pVnode->sync)) {
vInfo("vgId:%d, snapshot is sending", pVnode->config.vgId);
taosMsleep(300);
}
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);

View File

@ -1052,6 +1052,9 @@ static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
}
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
if (pBlock->info.rows == 0) {
return false;
}
if ((*pRowIndex) == pBlock->info.rows) {
return false;
}
@ -1184,10 +1187,10 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
}
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
blockDataCleanup(pDestBlock);
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
blockDataCleanup(pDestBlock);
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -1837,6 +1840,12 @@ FETCH_NEXT_BLOCK:
}
setBlockGroupIdByUid(pInfo, pDelBlock);
printDataBlock(pDelBlock, "stream scan delete recv filtered");
if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
}
goto FETCH_NEXT_BLOCK;
}
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;

View File

@ -133,6 +133,14 @@ static void i8VectorSumAVX2(const int8_t* plist, int32_t numOfRows, int32_t type
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width];
}
} else {
const uint8_t* p = (const uint8_t*)plist;
@ -142,16 +150,16 @@ static void i8VectorSumAVX2(const int8_t* plist, int32_t numOfRows, int32_t type
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const uint64_t* q = (const uint64_t*)&sum;
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.usum += (uint8_t)plist[j + rounds * width];
}
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + startIndex];
}
#endif
}
@ -176,8 +184,16 @@ static void i16VectorSumAVX2(const int16_t* plist, int32_t numOfRows, int32_t ty
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width];
}
} else {
const uint8_t* p = (const uint8_t*)plist;
const uint16_t* p = (const uint16_t*)plist;
for(int32_t i = 0; i < rounds; ++i) {
__m128i val = _mm_lddqu_si128((__m128i*)p);
@ -185,16 +201,16 @@ static void i16VectorSumAVX2(const int16_t* plist, int32_t numOfRows, int32_t ty
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const uint64_t* q = (const uint64_t*)&sum;
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.usum += (uint16_t)plist[j + rounds * width];
}
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + startIndex];
}
#endif
}
@ -219,6 +235,14 @@ static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, int32_t ty
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width];
}
} else {
const uint32_t* p = (const uint32_t*)plist;
@ -228,16 +252,16 @@ static void i32VectorSumAVX2(const int32_t* plist, int32_t numOfRows, int32_t ty
sum = _mm256_add_epi64(sum, extVal);
p += width;
}
// let sum up the final results
const uint64_t* q = (const uint64_t*)&sum;
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.usum += (uint32_t)plist[j + rounds * width];
}
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + startIndex];
}
#endif
}
@ -262,13 +286,22 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p
}
// let sum up the final results
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
if (type == TSDB_DATA_TYPE_BIGINT) {
const int64_t* q = (const int64_t*)&sum;
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
int32_t startIndex = rounds * width;
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + startIndex];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.isum += plist[j + rounds * width];
}
} else {
const uint64_t* q = (const uint64_t*)&sum;
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
for (int32_t j = 0; j < remainder; ++j) {
pRes->sum.usum += (uint64_t)plist[j + rounds * width];
}
}
#endif
}
@ -502,7 +535,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
i8VectorSumAVX2(plist, numOfRows, type, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.usum += plist[i];
if (type == TSDB_DATA_TYPE_TINYINT) {
pAvgRes->sum.isum += plist[i];
} else {
pAvgRes->sum.usum += (uint8_t)plist[i];
}
}
}
break;
@ -517,7 +554,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
i16VectorSumAVX2(plist, numOfRows, type, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
if (type == TSDB_DATA_TYPE_SMALLINT) {
pAvgRes->sum.isum += plist[i];
} else {
pAvgRes->sum.usum += (uint16_t)plist[i];
}
}
}
break;
@ -532,7 +573,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
i32VectorSumAVX2(plist, numOfRows, type, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
if (type == TSDB_DATA_TYPE_INT) {
pAvgRes->sum.isum += plist[i];
} else {
pAvgRes->sum.usum += (uint32_t)plist[i];
}
}
}
break;
@ -547,7 +592,11 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
i64VectorSumAVX2(plist, numOfRows, pAvgRes);
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
pAvgRes->sum.isum += plist[i];
if (type == TSDB_DATA_TYPE_BIGINT) {
pAvgRes->sum.isum += plist[i];
} else {
pAvgRes->sum.isum += (uint64_t)plist[i];
}
}
}
break;

View File

@ -197,6 +197,7 @@ typedef struct SSyncNode {
int32_t configChangeNum;
int32_t hbSlowNum;
int32_t hbrSlowNum;
int32_t tmrRoutineNum;
bool isStart;

View File

@ -1037,6 +1037,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->configChangeNum = 0;
pSyncNode->hbSlowNum = 0;
pSyncNode->hbrSlowNum = 0;
pSyncNode->tmrRoutineNum = 0;
sNTrace(pSyncNode, "sync open, node:%p", pSyncNode);
@ -1141,12 +1142,21 @@ void syncNodeClose(SSyncNode* pSyncNode) {
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] != NULL) {
sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]);
if (snapshotSenderIsStart((pSyncNode->senders)[i])) {
snapshotSenderStop((pSyncNode->senders)[i], false);
}
snapshotSenderDestroy((pSyncNode->senders)[i]);
(pSyncNode->senders)[i] = NULL;
}
}
if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
@ -2457,8 +2467,12 @@ const char* syncStr(ESyncState state) {
return "candidate";
case TAOS_SYNC_STATE_LEADER:
return "leader";
default:
case TAOS_SYNC_STATE_ERROR:
return "error";
case TAOS_SYNC_STATE_OFFLINE:
return "offline";
default:
return "unknown";
}
}

View File

@ -52,7 +52,13 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) {
}
static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
sNInfo(ths, "timer routines");
ths->tmrRoutineNum++;
if (ths->tmrRoutineNum % 60 == 0 && ths->replicaNum > 1) {
sNInfo(ths, "timer routines");
} else {
sNTrace(ths, "timer routines");
}
// timer replicate
syncNodeReplicate(ths);

View File

@ -169,6 +169,7 @@ typedef struct {
char spi : 2;
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
uint64_t timestamp;
char user[TSDB_UNI_LEN];
uint32_t magicNum;
STraceId traceId;

View File

@ -758,6 +758,14 @@ static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = transReqQueueRemove(req);
if (pConn == NULL) return;
SCliMsg* pMsg = !transQueueEmpty(&pConn->cliMsgs) ? transQueueGet(&pConn->cliMsgs, 0) : NULL;
if (pMsg != NULL) {
int64_t cost = taosGetTimestampUs() - pMsg->st;
if (cost > 1000) {
tWarn("%s conn %p send cost:%dus, send exception", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
}
}
if (status == 0) {
tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
} else {
@ -806,6 +814,7 @@ void cliSend(SCliConn* pConn) {
pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn);
@ -1662,6 +1671,7 @@ int transReleaseCliHandle(void* handle) {
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg;
cmsg->st = taosGetTimestampUs();
cmsg->type = Release;
cmsg->ctx = pCtx;

View File

@ -231,14 +231,29 @@ static bool uvHandleReq(SSvrConn* pConn) {
}
}
STraceId* trace = &pHead->traceId;
int64_t cost = taosGetTimestampUs() - taosNtoh64(pHead->timestamp);
static int64_t EXCEPTION_LIMIT_US = 100 * 1000;
if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen);
if (cost >= EXCEPTION_LIMIT_US) {
tGWarn("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pTransInst),
pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
}
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, transMsg.code);
if (cost >= EXCEPTION_LIMIT_US) {
tGWarn("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d, cost:%dus, recv exception",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
transMsg.code, (int)(cost));
} else {
tGWarn("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d, cost:%dus",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
transMsg.code, (int)(cost));
}
}
// pHead->noResp = 1,

View File

@ -1103,3 +1103,30 @@ void taosWinSocketInit() {
#else
#endif
}
uint64_t taosHton64(uint64_t val) {
#if defined(WINDOWS) || defined(DARWIN)
return ((val & 0x00000000000000ff) << 7 * 8) | ((val & 0x000000000000ff00) << 5 * 8) |
((val & 0x0000000000ff0000) << 3 * 8) | ((val & 0x00000000ff000000) << 1 * 8) |
((val & 0x000000ff00000000) >> 1 * 8) | ((val & 0x0000ff0000000000) >> 3 * 8) |
((val & 0x00ff000000000000) >> 5 * 8) | ((val & 0xff00000000000000) >> 7 * 8);
#else
if (__BYTE_ORDER == __LITTLE_ENDIAN) {
return (((uint64_t)htonl((int)((val << 32) >> 32))) << 32) | (unsigned int)htonl((int)(val >> 32));
} else if (__BYTE_ORDER == __BIG_ENDIAN) {
return val;
}
#endif
}
uint64_t taosNtoh64(uint64_t val) {
#if defined(WINDOWS) || defined(DARWIN)
return taosHton64(val);
#else
if (__BYTE_ORDER == __LITTLE_ENDIAN) {
return (((uint64_t)htonl((int)((val << 32) >> 32))) << 32) | (unsigned int)htonl((int)(val >> 32));
} else if (__BYTE_ORDER == __BIG_ENDIAN) {
return val;
}
#endif
}

View File

@ -554,17 +554,26 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) {
static SYSTEMTIME ss;
static LARGE_INTEGER offset;
ss.wYear = 1970;
ss.wMonth = 1;
ss.wDay = 1;
ss.wHour = 0;
ss.wMinute = 0;
ss.wSecond = 0;
ss.wMilliseconds = 0;
SystemTimeToFileTime(&ss, &ff);
offset.QuadPart = ff.dwHighDateTime;
offset.QuadPart <<= 32;
offset.QuadPart |= ff.dwLowDateTime;
static int8_t offsetInit = 0;
static volatile bool offsetInitFinished = false;
int8_t old = atomic_val_compare_exchange_8(&offsetInit, 0, 1);
if (0 == old) {
ss.wYear = 1970;
ss.wMonth = 1;
ss.wDay = 1;
ss.wHour = 0;
ss.wMinute = 0;
ss.wSecond = 0;
ss.wMilliseconds = 0;
SystemTimeToFileTime(&ss, &ff);
offset.QuadPart = ff.dwHighDateTime;
offset.QuadPart <<= 32;
offset.QuadPart |= ff.dwLowDateTime;
offsetInitFinished = true;
} else {
while (!offsetInitFinished)
; // Ensure initialization is completed.
}
GetSystemTimeAsFileTime(&f);
t.QuadPart = f.dwHighDateTime;

View File

@ -529,6 +529,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/leastsquares.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/length.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/length.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/limit.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/log.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/log.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/lower.py

View File

@ -15,155 +15,159 @@ from util.cluster import *
class TDTestCase:
def caseDescription(self):
'''
3.0 data compatibility test
case1: basedata version is 3.0.1.0
'''
return
def caseDescription(self):
'''
3.0 data compatibility test
case1: basedata version is 3.0.1.0
'''
return
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def getCfgPath(self):
buildPath = self.getBuildPath()
selfPath = os.path.dirname(os.path.realpath(__file__))
def getCfgPath(self):
buildPath = self.getBuildPath()
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
cfgPath = buildPath + "/../sim/dnode1/cfg/"
else:
cfgPath = buildPath + "/../sim/dnode1/cfg/"
if ("community" in selfPath):
cfgPath = buildPath + "/../sim/dnode1/cfg/"
else:
cfgPath = buildPath + "/../sim/dnode1/cfg/"
return cfgPath
return cfgPath
def installTaosd(self,bPath,cPath):
# os.system(f"rmtaos && mkdir -p {self.getBuildPath()}/build/lib/temp && mv {self.getBuildPath()}/build/lib/libtaos.so* {self.getBuildPath()}/build/lib/temp/ ")
# os.system(f" mv {bPath}/build {bPath}/build_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so {self.getBuildPath()}/build/lib/libtaos.so_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ")
def installTaosd(self,bPath,cPath):
# os.system(f"rmtaos && mkdir -p {self.getBuildPath()}/build/lib/temp && mv {self.getBuildPath()}/build/lib/libtaos.so* {self.getBuildPath()}/build/lib/temp/ ")
# os.system(f" mv {bPath}/build {bPath}/build_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so {self.getBuildPath()}/build/lib/libtaos.so_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ")
packagePath="/usr/local/src/"
packageName="TDengine-server-3.0.1.0-Linux-x64.tar.gz"
os.system(f"cd {packagePath} && tar xvf TDengine-server-3.0.1.0-Linux-x64.tar.gz && cd TDengine-server-3.0.1.0 && ./install.sh -e no " )
tdDnodes.stop(1)
print(f"start taosd: nohup taosd -c {cPath} & ")
os.system(f" nohup taosd -c {cPath} & " )
sleep(1)
packagePath="/usr/local/src/"
packageName="TDengine-server-3.0.1.0-Linux-x64.tar.gz"
os.system(f"cd {packagePath} && tar xvf TDengine-server-3.0.1.0-Linux-x64.tar.gz && cd TDengine-server-3.0.1.0 && ./install.sh -e no " )
tdDnodes.stop(1)
print(f"start taosd: nohup taosd -c {cPath} & ")
os.system(f" nohup taosd -c {cPath} & " )
sleep(1)
def buildTaosd(self,bPath):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os.system(f" cd {bPath} && make install ")
def buildTaosd(self,bPath):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os.system(f" cd {bPath} && make install ")
def run(self):
bPath=self.getBuildPath()
cPath=self.getCfgPath()
dbname = "test"
stb = f"{dbname}.meters"
self.installTaosd(bPath,cPath)
os.system("echo 'debugFlag 143' > /etc/taos/taos.cfg ")
tableNumbers=100
recordNumbers1=100
recordNumbers2=1000
tdsqlF=tdCom.newTdSql()
print(tdsqlF)
tdsqlF.query(f"SELECT SERVER_VERSION();")
print(tdsqlF.query(f"SELECT SERVER_VERSION();"))
oldServerVersion=tdsqlF.queryResult[0][0]
tdLog.info(f"Base server version is {oldServerVersion}")
tdsqlF.query(f"SELECT CLIENT_VERSION();")
# the oldClientVersion can't be updated in the same python process,so the version is new compiled verison
oldClientVersion=tdsqlF.queryResult[0][0]
tdLog.info(f"Base client version is {oldClientVersion}")
def run(self):
print(f"start taosd run")
bPath=self.getBuildPath()
cPath=self.getCfgPath()
dbname = "test"
stb = f"{dbname}.meters"
self.installTaosd(bPath,cPath)
os.system("echo 'debugFlag 143' > /etc/taos/taos.cfg ")
tableNumbers=100
recordNumbers1=100
recordNumbers2=1000
#tdsqlF=tdCom.newTdSql()
#print(tdsqlF)
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{oldServerVersion}")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
sleep(3)
oldServerVersion = '3.0.1.0'
#tdsqlF.query(f"SELECT SERVER_VERSION();")
#print(tdsqlF.query(f"SELECT SERVER_VERSION();"))
#oldServerVersion=tdsqlF.queryResult[0][0]
#tdLog.info(f"Base server version is {oldServerVersion}")
#tdsqlF.query(f"SELECT CLIENT_VERSION();")
#
## the oldClientVersion can't be updated in the same python process,so the version is new compiled verison
#oldClientVersion=tdsqlF.queryResult[0][0]
# tdsqlF.query(f"select count(*) from {stb}")
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
os.system("pkill taosd")
sleep(2)
#tdLog.info(f"Base client version is {oldClientVersion}")
print(f"start taosd: nohup taosd -c {cPath} & ")
os.system(f" nohup taosd -c {cPath} & " )
sleep(10)
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("pkill -9 taosd")
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{oldServerVersion}")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
sleep(3)
# tdsqlF.query(f"select count(*) from {stb}")
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
os.system("pkill taosd")
sleep(2)
print(f"start taosd: nohup taosd -c {cPath} & ")
os.system(f" nohup taosd -c {cPath} & " )
sleep(10)
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("pkill -9 taosd")
tdLog.printNoPrefix("==========step2:update new version ")
self.buildTaosd(bPath)
tdDnodes.start(1)
sleep(1)
tdsql=tdCom.newTdSql()
print(tdsql)
tdLog.printNoPrefix("==========step2:update new version ")
self.buildTaosd(bPath)
tdDnodes.start(1)
sleep(1)
tdsql=tdCom.newTdSql()
print(tdsql)
tdsql.query(f"SELECT SERVER_VERSION();")
nowServerVersion=tdsql.queryResult[0][0]
tdLog.info(f"New server version is {nowServerVersion}")
tdsql.query(f"SELECT CLIENT_VERSION();")
nowClientVersion=tdsql.queryResult[0][0]
tdLog.info(f"New client version is {nowClientVersion}")
tdsql.query(f"SELECT SERVER_VERSION();")
nowServerVersion=tdsql.queryResult[0][0]
tdLog.info(f"New server version is {nowServerVersion}")
tdsql.query(f"SELECT CLIENT_VERSION();")
nowClientVersion=tdsql.queryResult[0][0]
tdLog.info(f"New client version is {nowClientVersion}")
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers2)
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers2)
tdsql=tdCom.newTdSql()
tdLog.printNoPrefix(f"==========step4:verify backticks in taos Sql-TD18542")
tdsql.execute("drop database if exists db")
tdsql.execute("create database db")
tdsql.execute("use db")
tdsql.execute("create stable db.stb1 (ts timestamp, c1 int) tags (t1 int);")
tdsql.execute("insert into db.ct1 using db.stb1 TAGS(1) values(now(),11);")
tdsql.error(" insert into `db.ct2` using db.stb1 TAGS(9) values(now(),11);")
tdsql.error(" insert into db.`db.ct2` using db.stb1 TAGS(9) values(now(),11);")
tdsql.execute("insert into `db`.ct3 using db.stb1 TAGS(3) values(now(),13);")
tdsql.query("select * from db.ct3")
tdsql.checkData(0,1,13)
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
tdsql.query("select * from db.ct4")
tdsql.checkData(0,1,14)
tdsql.query("describe information_schema.ins_databases;")
qRows=tdsql.queryRows
for i in range(qRows) :
if tdsql.queryResult[i][0]=="retentions" :
return True
else:
return False
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdsql=tdCom.newTdSql()
tdLog.printNoPrefix(f"==========step4:verify backticks in taos Sql-TD18542")
tdsql.execute("drop database if exists db")
tdsql.execute("create database db")
tdsql.execute("use db")
tdsql.execute("create stable db.stb1 (ts timestamp, c1 int) tags (t1 int);")
tdsql.execute("insert into db.ct1 using db.stb1 TAGS(1) values(now(),11);")
tdsql.error(" insert into `db.ct2` using db.stb1 TAGS(9) values(now(),11);")
tdsql.error(" insert into db.`db.ct2` using db.stb1 TAGS(9) values(now(),11);")
tdsql.execute("insert into `db`.ct3 using db.stb1 TAGS(3) values(now(),13);")
tdsql.query("select * from db.ct3")
tdsql.checkData(0,1,13)
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
tdsql.query("select * from db.ct4")
tdsql.checkData(0,1,14)
tdsql.query("describe information_schema.ins_databases;")
qRows=tdsql.queryRows
for i in range(qRows) :
if tdsql.queryResult[i][0]=="retentions" :
return True
else:
return False
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,353 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 2
self.ctbNum = 10
self.rowsPerTbl = 10000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, paraDict):
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
# tdLog.debug(colString)
# tdLog.debug(tagString)
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
tdLog.debug("%s"%(sqlString))
tsql.execute(sqlString)
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
for i in range(ctbNum):
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)"%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
tsql.execute(sqlString)
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
# t = time.time()
# startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (i < ctbNum/2):
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
else:
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
# print("===sql: %s"%(sql))
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
# print("===sql: %s"%(sql))
tdLog.debug("insert data ............ [OK]")
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'lm2_db0',
'dropFlag': 1,
'vgroups': 2,
'stbName': 'lm2_stb0',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
'ctbPrefix': 'lm2_tb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 3000,
'startTs': 1537146000000,
'tsStep': 600000}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create database")
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar)
tdLog.info("create stb")
self.create_stable(tsql=tdSql, paraDict=paraDict)
tdLog.info("create child tables")
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'lm2_db0',
'dropFlag': 1,
'vgroups': 2,
'stbName': 'lm2_stb0',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
'ctbPrefix': 'lm2_tb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 3000,
'startTs': 1537146000000,
'tsStep': 600000}
val1 = 1
val2 = paraDict["ctbNum"] - 1
# select count(*), t1, t2, t3, t4, t5, t6 from $stb where t1 > $val1 and t1 < $val2 group by t1, t2, t3, t4, t5, t6 order by t1 asc limit 1 offset 0
sqlStr = f"select count(*), t1, t2, t3, t4, t5, t6 from %s where t1 > %d and t1 < %d group by t1, t2, t3, t4, t5, t6 order by t1 asc limit 1 offset 0"%(paraDict["stbName"], val1, val2)
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(1)
tdSql.checkData(0, 0, paraDict["rowsPerTbl"])
tdSql.checkData(0, 1, 2)
tdSql.checkData(0, 2, "tb2")
tdSql.checkData(0, 3, "tb2")
tdSql.checkData(0, 4, 2)
tdSql.checkData(0, 5, 2)
# select count(*), t3, t4 from $stb where t2 like '%' and t1 > 2 and t1 < 5 group by t3, t4 order by t3 desc limit 2 offset 0
sqlStr = f"select count(*), t3, t4 from %s where t2 like '%%' and t1 > 2 and t1 < 5 group by t3, t4 order by t3 desc limit 2 offset 0"%(paraDict["stbName"])
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(2)
tdSql.checkData(0, 0, paraDict["rowsPerTbl"])
tdSql.checkData(0, 1, "tb4")
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 1, "tb3")
tdSql.checkData(1, 2, 3)
# select count(*) from $stb where t2 like '%' and t1 > 2 and t1 < 5 group by t3, t4 order by t3 desc limit 1 offset 1
sqlStr = f"select count(*) from %s where t2 like '%%' and t1 > 2 and t1 < 5 group by t3, t4 order by t3 desc limit 1 offset 1"%(paraDict["stbName"])
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(1)
## TBASE-348
tdSql.error(f"select count(*) from %s where t1 like 1"%(paraDict["stbName"]))
ts0 = paraDict["startTs"]
delta = paraDict["tsStep"]
tsu = paraDict["rowsPerTbl"] * delta
tsu = tsu - delta
tsu = tsu + ts0
tb = paraDict["ctbPrefix"] + '0'
# select _wstart, max(c1) from $tb where ts >= $ts0 and ts <= $tsu interval(5m) fill(value, -1) limit 10 offset 1
sqlStr = f"select _wstart, max(c1) from %s where ts >= %d and ts <= %d interval(5m) fill(value, -1) limit 10 offset 1"%(tb, ts0, tsu)
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(10)
tdSql.checkData(0, 0, "18-09-17 09:05:00.000")
tdSql.checkData(0, 1, -1)
tdSql.checkData(1, 1, 1)
tdSql.checkData(9, 0, "18-09-17 09:50:00.000")
tdSql.checkData(9, 1, 5)
tb5 = paraDict["ctbPrefix"] + '5'
sqlStr = f"select max(c1), min(c2) from %s where ts >= %d and ts <= %d interval(5m) fill(value, -1, -2) limit 10 offset 1"%(tb5, ts0, tsu)
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(10)
tdSql.checkData(0, 0, -1)
tdSql.checkData(0, 1, -2)
tdSql.checkData(1, 0, 1)
tdSql.checkData(1, 1, -2)
tdSql.checkData(9, 0, 5)
tdSql.checkData(9, 1, -2)
### [TBASE-350]
## tb + interval + fill(value) + limit offset
tb = paraDict["ctbPrefix"] + '0'
limit = paraDict["rowsPerTbl"]
offset = limit / 2
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), stddev(c5), spread(c6), first(c7), last(c8), first(c9) from %s where ts >= %d and ts <= %d interval(5m) fill(value, -1, -2 ,-3, -4 , -5, -6 ,-7 ,'-8', '-9') limit %d offset %d"%(tb, ts0, tsu, limit, offset)
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(limit)
tdSql.checkData(0, 1, 0)
sqlStr = f"select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000) limit 8200"
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(8200)
sqlStr = f"select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000) limit 100000;"
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
sqlStr = f"select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000) limit 10 offset 8190;"
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(10)
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, -1000)
tdSql.checkData(2, 0, 6)
tdSql.checkData(3, 0, -1000)
sqlStr = f"select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000) limit 10 offset 10001;"
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(10)
tdSql.checkData(0, 0, -1000)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, -1000)
tdSql.checkData(3, 0, 2)
sqlStr = f"select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000) limit 10000 offset 10001;"
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(9998)
sqlStr = f"select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000) limit 100 offset 20001;"
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(0)
# tb + interval + fill(linear) + limit offset
limit = paraDict["rowsPerTbl"]
offset = limit / 2
sqlStr = f"select _wstart,max(c1), min(c2), sum(c3), avg(c4), stddev(c5), spread(c6), first(c7), last(c8), first(c9) from %s where ts >= %d and ts <= %d interval(5m) fill(linear) limit %d offset %d"%(tb,ts0,tsu,limit, offset)
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(limit)
tdSql.checkData(0, 1, 0)
tdSql.checkData(1, 1, 0)
tdSql.checkData(1, 3, 0.500000000)
tdSql.checkData(3, 5, 0.000000000)
tdSql.checkData(4, 6, 0.000000000)
tdSql.checkData(4, 7, 1)
tdSql.checkData(5, 7, None)
tdSql.checkData(6, 8, "binary3")
tdSql.checkData(7, 9, None)
limit = paraDict["rowsPerTbl"]
offset = limit / 2
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), stddev(c5), spread(c6), first(c7), last(c8), first(c9) from %s where ts >= %d and ts <= %d interval(5m) fill(prev) limit %d offset %d"%(tb,ts0,tsu,limit, offset)
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(limit)
limit = paraDict["rowsPerTbl"]
offset = limit / 2 + 10
sqlStr = f"select _wstart,max(c1), min(c2), sum(c3), avg(c4), stddev(c5), spread(c6), first(c7), last(c8), first(c9) from %s where ts >= %d and ts <= %d and c1 = 5 interval(5m) fill(value, -1, -2 ,-3, -4 , -5, -6 ,-7 ,'-8', '-9') limit %d offset %d"%(tb,ts0,tsu,limit, offset)
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(limit)
tdSql.checkData(0, 1, 5)
tdSql.checkData(0, 2, 5)
tdSql.checkData(0, 3, 5.000000000)
tdSql.checkData(0, 4, 5.000000000)
tdSql.checkData(0, 5, 0.000000000)
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, "binary5")
tdSql.checkData(0, 9, "nchar5")
tdSql.checkData(1, 8, None)
tdSql.checkData(1, 9, None)
limit = paraDict["rowsPerTbl"]
offset = limit * 2 - 11
sqlStr = f"select _wstart,max(c1), min(c2), sum(c3), avg(c4), stddev(c5), spread(c6), first(c7), last(c8), first(c9) from %s where ts >= %d and ts <= %d and c1 = 5 interval(5m) fill(value, -1, -2 ,-3, -4 , -5, -6 ,-7 ,'-8', '-9') limit %d offset %d"%(tb,ts0,tsu,limit, offset)
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(10)
tdSql.checkData(0, 1, -1)
tdSql.checkData(0, 2, -2)
tdSql.checkData(1, 1, 5)
tdSql.checkData(1, 2, 5)
tdSql.checkData(1, 3, 5.000000000)
tdSql.checkData(1, 5, 0.000000000)
tdSql.checkData(1, 6, 0.000000000)
tdSql.checkData(1, 8, "binary5")
tdSql.checkData(1, 9, "nchar5")
### [TBASE-350]
## stb + interval + fill + group by + limit offset
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') limit 2 offset 10"
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(2)
limit = 5
offset = paraDict["rowsPerTbl"] * 2
offset = offset - 2
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1 limit %d offset %d"%(limit, offset)
print("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 9)
tdSql.checkData(0, 1, 9)
tdSql.checkData(0, 2, 9.000000000)
tdSql.checkData(0, 3, 9.000000000)
tdSql.checkData(0, 4, 1)
tdSql.checkData(0, 5, "binary9")
tdSql.checkData(0, 6, "nchar9")
#add one more test case
sqlStr = f"select max(c1), last(c8) from lm2_db0.lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(linear) limit 10 offset 4089;"
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())