Merge branch '3.0' into cpwu/3.0

This commit is contained in:
cpwu 2022-07-05 15:18:03 +08:00
commit 95cac2a9b6
20 changed files with 2112 additions and 509 deletions

View File

@ -174,6 +174,8 @@ cmake .. -G "NMake Makefiles"
nmake
```
如果你使用的是 Visual Studio 2022 版本, 脚本 `vcvarsall.bat` 的默认安装路径是 `C:\Program Files\Microsoft Visual Studio\2022\Community\VC\Auxiliary\Build\vcvarsall.bat`
### Mac OS X 系统
安装 Xcode 命令行工具和 cmake. 在 Catalina 和 Big Sur 操作系统上,需要安装 XCode 11.4+ 版本。

View File

@ -136,7 +136,7 @@ cmake .. -DCPUTYPE=mips64 && cmake --build .
### On Windows platform
If you use the Visual Studio 2013, please open a command window by executing "cmd.exe".
If you use Visual Studio 2013, please open a command window by executing "cmd.exe".
Please specify "amd64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat.
```cmd
mkdir debug && cd debug
@ -145,7 +145,7 @@ cmake .. -G "NMake Makefiles"
nmake
```
If you use the Visual Studio 2019 or 2017:
If you use Visual Studio 2019 or 2017:
please open a command window by executing "cmd.exe".
Please specify "x64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat.
@ -164,6 +164,8 @@ cmake .. -G "NMake Makefiles"
nmake
```
If you use Visual Studio 2022, the only change is the default path of `vcvarsall.bat`, which is `C:\Program Files\Microsoft Visual Studio\2022\Community\VC\Auxiliary\Build\vcvarsall.bat`.
### On Mac OS X platform
Please install XCode command line tools and cmake. Verified with XCode 11.4+ on Catalina and Big Sur.

View File

@ -118,6 +118,7 @@ execute_process(COMMAND "${CMAKE_COMMAND}" --build .
# ================================================================================================
# googletest
if(${BUILD_TEST})
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
add_subdirectory(googletest EXCLUDE_FROM_ALL)
target_include_directories(
gtest
@ -259,7 +260,7 @@ if(${BUILD_MSVCREGEX})
SET_TARGET_PROPERTIES(msvcregex PROPERTIES OUTPUT_NAME msvcregex)
endif(${BUILD_MSVCREGEX})
# msvcregex
# wcwidth
if(${BUILD_WCWIDTH})
add_library(wcwidth STATIC "")
target_sources(wcwidth

View File

@ -719,8 +719,10 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
ASSERT(numOfElem >= 0);
pAvgRes->count += numOfElem;
if (IS_INTEGER_TYPE(type)) {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
pAvgRes->sum.isum += pAgg->sum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pAvgRes->sum.usum += pAgg->sum;
} else if (IS_FLOAT_TYPE(type)) {
pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
}
@ -784,6 +786,64 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t* plist = (uint8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.usum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t* plist = (uint16_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.usum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t* plist = (uint32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.usum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t* plist = (uint64_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.usum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float* plist = (float*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
@ -825,8 +885,10 @@ _avg_over:
static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) {
pOutput->type = pInput->type;
if (IS_INTEGER_TYPE(pOutput->type)) {
if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->sum.isum += pInput->sum.isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->sum.usum += pInput->sum.usum;
} else {
pOutput->sum.dsum += pInput->sum.dsum;
}
@ -900,6 +962,22 @@ int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
LIST_AVG_N(pAvgRes->sum.isum, int64_t);
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
LIST_AVG_N(pAvgRes->sum.usum, uint8_t);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
LIST_AVG_N(pAvgRes->sum.usum, uint16_t);
break;
}
case TSDB_DATA_TYPE_UINT: {
LIST_AVG_N(pAvgRes->sum.usum, uint32_t);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
LIST_AVG_N(pAvgRes->sum.usum, uint64_t);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
LIST_AVG_N(pAvgRes->sum.dsum, float);
break;
@ -925,8 +1003,10 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (IS_INTEGER_TYPE(type)) {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
pDBuf->sum.isum += pSBuf->sum.isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pDBuf->sum.usum += pSBuf->sum.usum;
} else {
pDBuf->sum.dsum += pSBuf->sum.dsum;
}
@ -941,8 +1021,10 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t type = pAvgRes->type;
if (IS_INTEGER_TYPE(type)) {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pAvgRes->result = pAvgRes->sum.usum / ((double)pAvgRes->count);
} else {
pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
}

View File

@ -1220,6 +1220,7 @@ static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
((SScanLogicNode*)info.pSplitNode)->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
if (TSDB_CODE_SUCCESS == code) {
SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);

View File

@ -65,6 +65,7 @@ int32_t syncInit() {
syncCleanUp();
ret = -1;
} else {
sDebug("sync rsetId:%" PRId64 " is open", tsNodeRefId);
ret = syncEnvStart();
}
}
@ -77,6 +78,7 @@ void syncCleanUp() {
ASSERT(ret == 0);
if (tsNodeRefId != -1) {
sDebug("sync rsetId:%" PRId64 " is closed", tsNodeRefId);
taosCloseRef(tsNodeRefId);
tsNodeRefId = -1;
}
@ -96,6 +98,7 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) {
return -1;
}
sDebug("vgId:%d, rid:%" PRId64 " is added to rsetId:%" PRId64, pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId);
return pSyncNode->rid;
}
@ -136,13 +139,14 @@ void syncStartStandBy(int64_t rid) {
void syncStop(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return;
}
if (pSyncNode == NULL) return;
int32_t vgId = pSyncNode->vgId;
syncNodeClose(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
taosRemoveRef(tsNodeRefId, rid);
sDebug("vgId:%d, rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId);
}
int32_t syncSetStandby(int64_t rid) {

View File

@ -40,8 +40,8 @@ struct SBTree {
#define TDB_BTREE_PAGE_IS_ROOT(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT)
#define TDB_BTREE_PAGE_IS_LEAF(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF)
#define TDB_BTREE_PAGE_IS_OVFL(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_OVFL)
#define TDB_BTREE_ASSERT_FLAG(flags) \
ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \
#define TDB_BTREE_ASSERT_FLAG(flags) \
ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \
TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0) || \
TDB_FLAG_IS(flags, TDB_BTREE_OVFL))
@ -58,7 +58,7 @@ typedef struct {
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2);
static int tdbBtreeOpenImpl(SBTree *pBt);
//static int tdbBtreeInitPage(SPage *pPage, void *arg, int init);
// static int tdbBtreeInitPage(SPage *pPage, void *arg, int init);
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
int *szCell, TXN *pTxn, SBTree *pBt);
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt);
@ -321,7 +321,7 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
{
// 1. TODO: Search the main DB to check if the DB exists
ret = tdbPagerOpenDB(pBt->pPager, &pgno, true);
ret = tdbPagerOpenDB(pBt->pPager, &pgno, true, pBt);
ASSERT(ret == 0);
}
@ -721,7 +721,8 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
int szNewCell;
SPgno pgno;
pgno = TDB_PAGE_PGNO(pNews[iNew]);
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell, pTxn, pBt);
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell, pTxn,
pBt);
tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
tdbOsFree(pNewCell);
}
@ -916,10 +917,10 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
int nLocal = surplus <= maxLocal ? surplus : minLocal;
//int ofpCap = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr));
// int ofpCap = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr));
// fetch a new ofp and make it dirty
SPgno pgno = 0;
SPgno pgno = 0;
SPage *ofp, *nextOfp;
ret = tdbFetchOvflPage(&pgno, &ofp, pTxn, pBt);
@ -942,8 +943,8 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
nLeft -= kLen;
// pack partial val to local if any space left
if (nLocal > kLen + 4) {
memcpy(pCell + nHeader + kLen, pVal, nLocal - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno);
memcpy(pCell + nHeader + kLen, pVal, nLocal - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno);
}
// pack nextPgno
@ -951,132 +952,132 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
// pack left val data to ovpages
do {
lastPage = 0;
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
lastPage = 0;
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
// fetch next ofp if not last page
if (!lastPage) {
// fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
} else {
pgno = 0;
}
// fetch next ofp if not last page
if (!lastPage) {
// fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
} else {
pgno = 0;
}
memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
memcpy(pBuf + bytes, &pgno, sizeof(pgno));
memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
memcpy(pBuf + bytes, &pgno, sizeof(pgno));
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
ofp = nextOfp;
nLeft -= bytes;
ofp = nextOfp;
nLeft -= bytes;
} while (nLeft > 0);
} else {
int nLeftKey = kLen;
// pack partial key and nextPgno
memcpy(pCell + nHeader, pKey, nLocal - 4);
nLeft -= nLocal - 4;
nLeftKey -= nLocal -4;
nLeftKey -= nLocal - 4;
memcpy(pCell + nHeader + nLocal - 4, &pgno, sizeof(pgno));
int lastKeyPageSpace = 0;
// pack left key & val to ovpages
do {
// cal key to cpy
int lastKeyPage = 0;
if (nLeftKey <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeftKey;
lastKeyPage = 1;
lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
// cal key to cpy
int lastKeyPage = 0;
if (nLeftKey <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeftKey;
lastKeyPage = 1;
lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
// cpy key
memcpy(pBuf, ((SCell *)pKey) + kLen - nLeftKey, bytes);
// cpy key
memcpy(pBuf, ((SCell *)pKey) + kLen - nLeftKey, bytes);
if (lastKeyPage) {
if (lastKeyPageSpace >= vLen) {
memcpy(pBuf + kLen -nLeftKey, pVal, vLen);
if (lastKeyPage) {
if (lastKeyPageSpace >= vLen) {
memcpy(pBuf + kLen - nLeftKey, pVal, vLen);
nLeft -= vLen;
pgno = 0;
} else {
memcpy(pBuf + kLen -nLeftKey, pVal, lastKeyPageSpace);
nLeft -= lastKeyPageSpace;
nLeft -= vLen;
pgno = 0;
} else {
memcpy(pBuf + kLen - nLeftKey, pVal, lastKeyPageSpace);
nLeft -= lastKeyPageSpace;
// fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
return -1;
}
}
} else {
// fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
return -1;
}
}
// fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
return -1;
}
}
} else {
// fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
return -1;
}
}
memcpy(pBuf + kLen - nLeft, &pgno, sizeof(pgno));
memcpy(pBuf + kLen - nLeft, &pgno, sizeof(pgno));
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
return -1;
}
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
return -1;
}
ofp = nextOfp;
nLeftKey -= bytes;
nLeft -= bytes;
ofp = nextOfp;
nLeftKey -= bytes;
nLeft -= bytes;
} while (nLeftKey > 0);
while (nLeft > 0) {
// pack left val data to ovpages
lastPage = 0;
if (nLeft <= maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = maxLocal - sizeof(SPgno);
}
// pack left val data to ovpages
lastPage = 0;
if (nLeft <= maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = maxLocal - sizeof(SPgno);
}
// fetch next ofp if not last page
if (!lastPage) {
// fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
} else {
pgno = 0;
}
// fetch next ofp if not last page
if (!lastPage) {
// fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
} else {
pgno = 0;
}
memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
memcpy(pBuf + bytes, &pgno, sizeof(pgno));
memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
memcpy(pBuf + bytes, &pgno, sizeof(pgno));
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
ofp = nextOfp;
nLeft -= bytes;
ofp = nextOfp;
nLeft -= bytes;
}
}
@ -1142,7 +1143,8 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
return 0;
}
static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt) {
static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder, TXN *pTxn,
SBTree *pBt) {
int ret = 0;
int nPayload;
int maxLocal = pPage->maxLocal;
@ -1171,149 +1173,149 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
int nLocal = surplus <= maxLocal ? surplus : minLocal;
int nLeft = nPayload;
SPgno pgno = 0;
int nLeft = nPayload;
SPgno pgno = 0;
SPage *ofp;
SCell *ofpCell;
int bytes;
int lastPage = 0;
int bytes;
int lastPage = 0;
if (nLocal >= pDecoder->kLen + 4) {
pDecoder->pKey = (SCell *)pCell + nHeader;
nLeft -= kLen;
if (nLocal > kLen + 4) {
// read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
// read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno));
memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno);
nLeft -= nLocal - kLen - sizeof(SPgno);
}
memcpy(&pgno, pCell + nHeader + nPayload - nLeft, sizeof(pgno));
// unpack left val data from ovpages
while (pgno != 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
while (pgno != 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
ofpCell = tdbPageGetCell(ofp, 0);
ofpCell = tdbPageGetCell(ofp, 0);
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
memcpy(pDecoder->pVal + vLen - nLeft, ofpCell, bytes);
nLeft -= bytes;
memcpy(pDecoder->pVal + vLen - nLeft, ofpCell, bytes);
nLeft -= bytes;
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
}
} else {
int nLeftKey = kLen;
// load partial key and nextPgno
pDecoder->pKey = tdbRealloc(pDecoder->pKey, kLen);
if (pDecoder->pKey == NULL) {
return -1;
return -1;
}
TDB_CELLDECODER_SET_FREE_KEY(pDecoder);
memcpy(pDecoder->pKey, pCell + nHeader, nLocal - 4);
nLeft -= nLocal - 4;
nLeftKey -= nLocal -4;
nLeftKey -= nLocal - 4;
memcpy(&pgno, pCell + nHeader + nLocal - 4, sizeof(pgno));
int lastKeyPageSpace = 0;
// load left key & val to ovpages
while (pgno != 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
ofpCell = tdbPageGetCell(ofp, 0);
ofpCell = tdbPageGetCell(ofp, 0);
int lastKeyPage = 0;
if (nLeftKey <= maxLocal - sizeof(SPgno)) {
bytes = nLeftKey;
lastKeyPage = 1;
lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
int lastKeyPage = 0;
if (nLeftKey <= maxLocal - sizeof(SPgno)) {
bytes = nLeftKey;
lastKeyPage = 1;
lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
// cpy key
memcpy(pDecoder->pKey + kLen - nLeftKey, ofpCell, bytes);
// cpy key
memcpy(pDecoder->pKey + kLen - nLeftKey, ofpCell, bytes);
if (lastKeyPage) {
if (lastKeyPageSpace >= vLen) {
pDecoder->pVal = ofpCell + kLen -nLeftKey;
if (lastKeyPage) {
if (lastKeyPageSpace >= vLen) {
pDecoder->pVal = ofpCell + kLen - nLeftKey;
nLeft -= vLen;
pgno = 0;
} else {
// read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
nLeft -= vLen;
pgno = 0;
} else {
// read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
memcpy(pDecoder->pVal, ofpCell + kLen -nLeftKey, lastKeyPageSpace);
nLeft -= lastKeyPageSpace;
}
}
memcpy(pDecoder->pVal, ofpCell + kLen - nLeftKey, lastKeyPageSpace);
nLeft -= lastKeyPageSpace;
}
}
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
nLeftKey -= bytes;
nLeft -= bytes;
nLeftKey -= bytes;
nLeft -= bytes;
}
while (nLeft > 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
ofpCell = tdbPageGetCell(ofp, 0);
ofpCell = tdbPageGetCell(ofp, 0);
// load left val data to ovpages
lastPage = 0;
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
// load left val data to ovpages
lastPage = 0;
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
if (lastPage) {
pgno = 0;
}
if (lastPage) {
pgno = 0;
}
if (!pDecoder->pVal) {
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
}
if (!pDecoder->pVal) {
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
}
memcpy(pDecoder->pVal, ofpCell + vLen - nLeft, bytes);
nLeft -= bytes;
memcpy(pDecoder->pVal, ofpCell + vLen - nLeft, bytes);
nLeft -= bytes;
memcpy(&pgno, ofpCell + vLen - nLeft + bytes, sizeof(pgno));
memcpy(&pgno, ofpCell + vLen - nLeft + bytes, sizeof(pgno));
nLeft -= bytes;
nLeft -= bytes;
}
}
}
@ -1404,31 +1406,31 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
// free ofp pages' cells
if (dropOfp) {
int ret = 0;
SPgno pgno = *(SPgno *) (pCell + nHeader + nLocal - sizeof(SPgno));
int nLeft = nPayload - nLocal + sizeof(SPgno);
int ret = 0;
SPgno pgno = *(SPgno *)(pCell + nHeader + nLocal - sizeof(SPgno));
int nLeft = nPayload - nLocal + sizeof(SPgno);
SPage *ofp;
int bytes;
int bytes;
while (pgno != 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
SCell *ofpCell = tdbPageGetCell(ofp, 0);
SCell *ofpCell = tdbPageGetCell(ofp, 0);
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
tdbPagerReturnPage(pPage->pPager, ofp, pTxn);
tdbPagerReturnPage(pPage->pPager, ofp, pTxn);
nLeft -= bytes;
nLeft -= bytes;
}
}
@ -1932,7 +1934,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
// alloc space
szBuf = kLen + nData + 14;
pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
if (pBuf == NULL) {
ASSERT(0);
return -1;

View File

@ -98,7 +98,7 @@ int tdbPagerClose(SPager *pPager) {
return 0;
}
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) {
SPgno pgno;
SPage *pPage;
int ret;
@ -110,25 +110,41 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
}
{
// TODO: try to search the main DB to get the page number
// pgno = 0;
// TODO: try to search the main DB to get the page number
// pgno = 0;
}
// if (pgno == 0 && toCreate) {
// ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
// if (ret < 0) {
// return -1;
// }
if (pgno == 0 && toCreate) {
// allocate a new child page
TXN txn;
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
// // TODO: Need to zero the page
pPager->inTran = 1;
// ret = tdbPagerWrite(pPager, pPage);
// if (ret < 0) {
// return -1;
// }
// }
SBtreeInitPageArg zArg;
zArg.flags = 0x1 | 0x2; // root leaf node;
zArg.pBt = pBt;
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn);
if (ret < 0) {
return -1;
}
*ppgno = pgno;
// ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
// if (ret < 0) {
// return -1;
//}
// TODO: Need to zero the page
ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) {
return -1;
}
tdbTxnClose(&txn);
}
*ppgno = pgno;
return 0;
}
@ -427,9 +443,9 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
}
int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
int ret = 0;
int ret = 0;
SPgno journalSize = 0;
u8 *pageBuf = NULL;
u8 *pageBuf = NULL;
tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
if (jfd == NULL) {
@ -454,7 +470,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
// read pgno & the page from journal
SPgno pgno;
SPgno pgno;
SPage *pPage;
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));

View File

@ -128,13 +128,13 @@ typedef struct SBtInfo {
#define TDB_CELLDECODER_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_VAL)
typedef struct {
int kLen;
u8 *pKey;
int vLen;
u8 *pVal;
SPgno pgno;
u8 *pBuf;
u8 freeKV;
int kLen;
u8 *pKey;
int vLen;
u8 *pVal;
SPgno pgno;
u8 *pBuf;
u8 freeKV;
} SCellDecoder;
struct SBTC {
@ -184,7 +184,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt);
int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager, TXN *pTxn);
int tdbPagerCommit(SPager *pPager, TXN *pTxn);
@ -192,7 +192,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initP
TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
int tdbPagerRestore(SPager *pPager, SBTree *pBt);
int tdbPagerRestore(SPager *pPager, SBTree *pBt);
// tdbPCache.c ====================================
#define TDB_PCACHE_PAGE \
@ -314,19 +314,18 @@ static inline int tdbTryLockPage(tdb_spinlock_t *pLock) {
#define TDB_TRY_LOCK_PAGE(pPage) tdbTryLockPage(&((pPage)->lock))
// APIs
#define TDB_PAGE_TOTAL_CELLS(pPage) ((pPage)->nOverflow + (pPage)->pPageMethods->getCellNum(pPage))
#define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx)
#define TDB_PAGE_FREE_SIZE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell, 0, NULL, NULL) + (pPage)->pPageMethods->szOffset)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
#define TDB_PAGE_TOTAL_CELLS(pPage) ((pPage)->nOverflow + (pPage)->pPageMethods->getCellNum(pPage))
#define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx)
#define TDB_PAGE_FREE_SIZE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) \
((*(pPage)->xCellSize)(pPage, pCell, 0, NULL, NULL) + (pPage)->pPageMethods->szOffset)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg);
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt));
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt));
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt));
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt));
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl);
int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt);
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt);

View File

@ -14,7 +14,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
def prepare_datas(self):
tdSql.execute(
'''create table stb1
@ -22,7 +22,7 @@ class TDTestCase:
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
@ -64,7 +64,7 @@ class TDTestCase:
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
'''
)
def check_avg(self ,origin_query , check_query):
avg_result = tdSql.getResult(origin_query)
origin_result = tdSql.getResult(check_query)
@ -73,13 +73,13 @@ class TDTestCase:
for row_index , row in enumerate(avg_result):
for col_index , elem in enumerate(row):
if avg_result[row_index][col_index] != origin_result[row_index][col_index]:
check_status = False
check_status = False
if not check_status:
tdLog.notice("avg function value has not as expected , sql is \"%s\" "%origin_query )
sys.exit(1)
else:
tdLog.info("avg value check pass , it work as expected ,sql is \"%s\" "%check_query )
def test_errors(self):
error_sql_lists = [
"select avg from t1",
@ -113,42 +113,42 @@ class TDTestCase:
]
for error_sql in error_sql_lists:
tdSql.error(error_sql)
def support_types(self):
type_error_sql_lists = [
"select avg(ts) from t1" ,
"select avg(ts) from t1" ,
"select avg(c7) from t1",
"select avg(c8) from t1",
"select avg(c9) from t1",
"select avg(ts) from ct1" ,
"select avg(ts) from ct1" ,
"select avg(c7) from ct1",
"select avg(c8) from ct1",
"select avg(c9) from ct1",
"select avg(ts) from ct3" ,
"select avg(ts) from ct3" ,
"select avg(c7) from ct3",
"select avg(c8) from ct3",
"select avg(c9) from ct3",
"select avg(ts) from ct4" ,
"select avg(ts) from ct4" ,
"select avg(c7) from ct4",
"select avg(c8) from ct4",
"select avg(c9) from ct4",
"select avg(ts) from stb1" ,
"select avg(ts) from stb1" ,
"select avg(c7) from stb1",
"select avg(c8) from stb1",
"select avg(c9) from stb1" ,
"select avg(ts) from stbbb1" ,
"select avg(ts) from stbbb1" ,
"select avg(c7) from stbbb1",
"select avg(ts) from tbname",
"select avg(c9) from tbname"
]
for type_sql in type_error_sql_lists:
tdSql.error(type_sql)
type_sql_lists = [
"select avg(c1) from t1",
"select avg(c2) from t1",
@ -178,16 +178,16 @@ class TDTestCase:
"select avg(c5) from stb1",
"select avg(c6) from stb1",
"select avg(c6) as alisb from stb1",
"select avg(c6) alisb from stb1",
"select avg(c6) as alisb from stb1",
"select avg(c6) alisb from stb1",
]
for type_sql in type_sql_lists:
tdSql.query(type_sql)
def basic_avg_function(self):
# basic query
# basic query
tdSql.query("select c1 from ct3")
tdSql.checkRows(0)
tdSql.query("select c1 from t1")
@ -207,18 +207,18 @@ class TDTestCase:
tdSql.query("select avg(c5) from ct3")
tdSql.checkRows(0)
tdSql.query("select avg(c6) from ct3")
# used for regular table
tdSql.query("select avg(c1) from t1")
tdSql.checkData(0, 0, 5.000000000)
tdSql.query("select ts,c1, c2, c3 , c4, c5 from t1")
tdSql.checkData(1, 5, 1.11000)
tdSql.checkData(3, 4, 33)
tdSql.checkData(5, 5, None)
self.check_avg(" select avg(c1) , avg(c2) , avg(c3) from t1 " , " select sum(c1)/count(c1) , sum(c2)/count(c2) , sum(c3)/count(c3) from t1 ")
# used for sub table
tdSql.query("select avg(c1) from ct1")
tdSql.checkData(0, 0, 4.846153846)
@ -229,8 +229,8 @@ class TDTestCase:
self.check_avg(" select avg(abs(c1)) , avg(abs(c2)) , avg(abs(c3)) from t1 " , " select sum(abs(c1))/count(c1) , sum(abs(c2))/count(c2) , sum(abs(c3))/count(c3) from t1 ")
self.check_avg(" select avg(abs(c1)) , avg(abs(c2)) , avg(abs(c3)) from stb1 " , " select sum(abs(c1))/count(c1) , sum(abs(c2))/count(c2) , sum(abs(c3))/count(c3) from stb1 ")
# used for stable table
# used for stable table
tdSql.query("select avg(c1) from stb1")
tdSql.checkRows(1)
@ -241,10 +241,10 @@ class TDTestCase:
tdSql.error("select avg(c1) from tbname")
tdSql.error("select avg(c1) from ct5")
# mix with common col
# mix with common col
tdSql.error("select c1, avg(c1) from ct1")
tdSql.error("select c1, avg(c1) from ct4")
# mix with common functions
tdSql.error("select c1, avg(c1),c5, floor(c5) from ct4 ")
@ -278,11 +278,11 @@ class TDTestCase:
tdSql.query("select count(*) from stb1 ")
tdSql.checkData(0,0,25)
# bug fix for compute
# bug fix for compute
tdSql.error("select c1, avg(c1) -0 ,ceil(c1)-0 from ct4 ")
tdSql.error(" select c1, avg(c1) -0 ,avg(ceil(c1-0.1))-0.1 from ct4")
# mix with nest query
# mix with nest query
self.check_avg("select avg(col) from (select abs(c1) col from stb1)" , "select avg(abs(c1)) from stb1")
self.check_avg("select avg(col) from (select ceil(abs(c1)) col from stb1)" , "select avg(abs(c1)) from stb1")
@ -297,7 +297,7 @@ class TDTestCase:
tdSql.query(" select avg(c1) from stb1 where c1 is null ")
tdSql.checkRows(0)
def avg_func_filter(self):
tdSql.execute("use db")
tdSql.query(" select avg(c1), avg(c1) -0 ,avg(ceil(c1-0.1))-0 ,avg(floor(c1+0.1))-0.1 ,avg(ceil(log(c1,2)-0.5)) from ct4 where c1>5 ")
@ -324,7 +324,7 @@ class TDTestCase:
def avg_Arithmetic(self):
pass
def check_boundary_values(self):
tdSql.execute("drop database if exists bound_test")
@ -344,11 +344,11 @@ class TDTestCase:
tdSql.execute(
f"insert into sub1_bound values ( now(), 2147483646, 9223372036854775806, 32766, 126, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.execute(
f"insert into sub1_bound values ( now(), 2147483645, 9223372036854775805, 32765, 125, 3.40E+37, 1.7e+307, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.execute(
f"insert into sub1_bound values ( now(), 2147483644, 9223372036854775804, 32764, 124, 3.40E+37, 1.7e+307, True, 'binary_tb1', 'nchar_tb1', now() )"
)
@ -359,14 +359,14 @@ class TDTestCase:
tdSql.execute(
f"insert into sub1_bound values ( now(), 2147483646, 9223372036854775806, 32766, 126, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.error(
f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
self.check_avg("select avg(c1), avg(c2), avg(c3) , avg(c4), avg(c5) ,avg(c6) from sub1_bound " , " select sum(c1)/count(c1), sum(c2)/count(c2) ,sum(c3)/count(c3), sum(c4)/count(c4), sum(c5)/count(c5) ,sum(c6)/count(c6) from sub1_bound ")
# check basic elem for table per row
tdSql.query("select avg(c1) ,avg(c2) , avg(c3) , avg(c4), avg(c5), avg(c6) from sub1_bound ")
tdSql.checkRows(1)
@ -376,8 +376,8 @@ class TDTestCase:
tdSql.checkData(0,3,53.571428571)
tdSql.checkData(0,4,5.828571332045761e+37)
# tdSql.checkData(0,5,None)
# check + - * / in functions
tdSql.query(" select avg(c1+1) ,avg(c2) , avg(c3*1) , avg(c4/2), avg(c5)/2, avg(c6) from sub1_bound ")
tdSql.checkData(0,0,920350134.5714285)
@ -386,33 +386,33 @@ class TDTestCase:
tdSql.checkData(0,3,26.785714286)
tdSql.checkData(0,4,2.9142856660228804e+37)
# tdSql.checkData(0,5,None)
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table ==============")
self.prepare_datas()
tdLog.printNoPrefix("==========step2:test errors ==============")
tdLog.printNoPrefix("==========step2:test errors ==============")
self.test_errors()
tdLog.printNoPrefix("==========step3:support types ============")
tdLog.printNoPrefix("==========step3:support types ============")
self.support_types()
tdLog.printNoPrefix("==========step4: avg basic query ============")
tdLog.printNoPrefix("==========step4: avg basic query ============")
self.basic_avg_function()
tdLog.printNoPrefix("==========step5: avg boundary query ============")
tdLog.printNoPrefix("==========step5: avg boundary query ============")
self.check_boundary_values()
tdLog.printNoPrefix("==========step6: avg filter query ============")
tdLog.printNoPrefix("==========step6: avg filter query ============")
self.avg_func_filter()

View File

@ -687,8 +687,8 @@ class TDTestCase:
# to_json()
tdSql.query("select to_json('{\"abc\":123}') from jsons1_1")
tdSql.checkRows(2)
# tdSql.checkData(0, 0, '{"abc":123}')
# tdSql.checkData(1, 0, '{"abc":123}')
tdSql.checkData(0, 0, '{"abc":123}')
tdSql.checkData(1, 0, '{"abc":123}')
tdSql.query("select to_json('null') from jsons1_1")
tdSql.checkRows(2)
tdSql.checkData(0, 0, 'null')

View File

@ -3,7 +3,8 @@ from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
import time
from datetime import datetime
class TDTestCase:
def init(self, conn, logSql):
@ -13,205 +14,209 @@ class TDTestCase:
self.rowNum = 10
self.ts = 1537146000000 # 2018-9-17 09:00:00.000
self.ts_str = [
'2020-1-1',
'2020-2-1 00:00:01',
'2020-3-1 00:00:00.001',
'2020-4-1 00:00:00.001002',
'2020-5-1 00:00:00.001002001'
]
self.db_param_precision = ['ms','us','ns']
self.time_unit = ['1w','1d','1h','1m','1s','1a','1u']
self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1']
self.ntbname = 'ntb'
self.stbname = 'stb'
self.ctbname = 'ctb'
def get_ms_timestamp(self,ts_str):
_ts_str = ts_str
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 15 :
_ts_str = ts_str[:-3]
if ':' in _ts_str and '.' in _ts_str:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d %H:%M:%S.%f")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
elif ':' in _ts_str and '.' not in _ts_str:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d %H:%M:%S")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
else:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
return date_time
def get_us_timestamp(self,ts_str):
_ts = self.get_ms_timestamp(ts_str) * 1000
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 12:
us_ts = p[12:15]
_ts += int(us_ts)
return _ts
def get_ns_timestamp(self,ts_str):
_ts = self.get_us_timestamp(ts_str) *1000
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 15:
us_ts = p[15:]
_ts += int(us_ts)
return _ts
def time_transform(self,ts_str,precision):
date_time = []
if precision == 'ms':
for i in ts_str:
date_time.append(self.get_ms_timestamp(i))
elif precision == 'us':
for i in ts_str:
date_time.append(self.get_us_timestamp(i))
elif precision == 'ns':
for i in ts_str:
date_time.append(self.get_us_timestamp(i))
return date_time
def check_ms_timestamp(self,unit,date_time):
if unit.lower() == '1a':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]))
elif unit.lower() == '1s':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000)*1000)
elif unit.lower() == '1m':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60)*60*1000)
elif unit.lower() == '1h':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60)*60*60*1000 )
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24)*24*60*60*1000)
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24/7)*7*24*60*60*1000)
def check_us_timestamp(self,unit,date_time):
if unit.lower() == '1u':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]))
elif unit.lower() == '1a':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000)*1000)
elif unit.lower() == '1s':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000)*1000*1000)
elif unit.lower() == '1m':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60)*60*1000*1000)
elif unit.lower() == '1h':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60)*60*60*1000*1000 )
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24)*24*60*60*1000*1000 )
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24/7)*7*24*60*60*1000*1000)
def check_ns_timestamp(self,unit,date_time):
if unit.lower() == '1u':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000)*1000)
elif unit.lower() == '1a':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000)*1000*1000)
elif unit.lower() == '1s':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000)*1000*1000*1000)
elif unit.lower() == '1m':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60)*60*1000*1000*1000)
elif unit.lower() == '1h':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60)*60*60*1000*1000*1000 )
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24)*24*60*60*1000*1000*1000 )
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24/7)*7*24*60*60*1000*1000*1000)
def data_check(self,date_time,precision,tb_type):
for unit in self.time_unit:
if (unit.lower() == '1u' and precision.lower() == 'ms') or () :
if tb_type.lower() == 'ntb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.stbname}')
elif precision.lower() == 'ms':
if tb_type.lower() == 'ntb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.stbname}')
tdSql.checkRows(len(self.ts_str))
self.check_ms_timestamp(unit,date_time)
elif precision.lower() == 'us':
if tb_type.lower() == 'ntb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.stbname}')
tdSql.checkRows(len(self.ts_str))
self.check_us_timestamp(unit,date_time)
elif precision.lower() == 'ns':
if tb_type.lower() == 'ntb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.stbname}')
tdSql.checkRows(len(self.ts_str))
self.check_ns_timestamp(unit,date_time)
for unit in self.error_unit:
if tb_type.lower() == 'ntb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.stbname}')
def function_check_ntb(self):
for precision in self.db_param_precision:
tdSql.execute('drop database if exists db')
tdSql.execute(f'create database db precision "{precision}"')
tdSql.execute('use db')
tdSql.execute(f'create table {self.ntbname} (ts timestamp,c0 int)')
for ts in self.ts_str:
tdSql.execute(f'insert into {self.ntbname} values("{ts}",1)')
date_time = self.time_transform(self.ts_str,precision)
self.data_check(date_time,precision,'ntb')
def function_check_stb(self):
for precision in self.db_param_precision:
tdSql.execute('drop database if exists db')
tdSql.execute(f'create database db precision "{precision}"')
tdSql.execute('use db')
tdSql.execute(f'create table {self.stbname} (ts timestamp,c0 int) tags(t0 int)')
tdSql.execute(f'create table {self.ctbname} using {self.stbname} tags(1)')
for ts in self.ts_str:
tdSql.execute(f'insert into {self.ctbname} values("{ts}",1)')
date_time = self.time_transform(self.ts_str,precision)
self.data_check(date_time,precision,'ctb')
self.data_check(date_time,precision,'stb')
def run(self):
tdSql.prepare()
intData = []
floatData = []
tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''')
tdSql.execute("create table stb_1 using stb tags('beijing')")
tdSql.execute('''create table ntb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
for i in range(self.rowNum):
tdSql.execute("insert into ntb values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
intData.append(i + 1)
floatData.append(i + 0.1)
for i in range(self.rowNum):
tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
intData.append(i + 1)
floatData.append(i + 0.1)
tdSql.query("select timetruncate(1,1d) from ntb")
tdSql.checkRows(10)
tdSql.error("select timetruncate(1,1u) from ntb")
#tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1a) from ntb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1m) from ntb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1h) from ntb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(ts,1d) from ntb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 08:00:00.000")
tdSql.query("select timetruncate(ts,1h) from ntb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.query("select timetruncate(ts,1m) from ntb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.query("select timetruncate(ts,1s) from ntb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.query("select timetruncate(ts,1a) from ntb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.checkData(1,0,"2018-09-17 09:00:00.001")
tdSql.checkData(2,0,"2018-09-17 09:00:00.002")
tdSql.checkData(3,0,"2018-09-17 09:00:00.003")
tdSql.checkData(4,0,"2018-09-17 09:00:00.004")
tdSql.checkData(5,0,"2018-09-17 09:00:00.005")
tdSql.checkData(6,0,"2018-09-17 09:00:00.006")
tdSql.checkData(7,0,"2018-09-17 09:00:00.007")
tdSql.checkData(8,0,"2018-09-17 09:00:00.008")
tdSql.checkData(9,0,"2018-09-17 09:00:00.009")
# tdSql.query("select timetruncate(ts,1u) from ntb")
# tdSql.checkRows(10)
# tdSql.checkData(0,0,"2018-09-17 09:00:00.000000")
# tdSql.checkData(1,0,"2018-09-17 09:00:00.001000")
# tdSql.checkData(2,0,"2018-09-17 09:00:00.002000")
# tdSql.checkData(3,0,"2018-09-17 09:00:00.003000")
# tdSql.checkData(4,0,"2018-09-17 09:00:00.004000")
# tdSql.checkData(5,0,"2018-09-17 09:00:00.005000")
# tdSql.checkData(6,0,"2018-09-17 09:00:00.006000")
# tdSql.checkData(7,0,"2018-09-17 09:00:00.007000")
# tdSql.checkData(8,0,"2018-09-17 09:00:00.008000")
# tdSql.checkData(9,0,"2018-09-17 09:00:00.009000")
# tdSql.query("select timetruncate(ts,1b) from ntb")
# tdSql.checkRows(10)
# tdSql.checkData(0,0,"2018-09-17 09:00:00.000000000")
# tdSql.checkData(1,0,"2018-09-17 09:00:00.001000000")
# tdSql.checkData(2,0,"2018-09-17 09:00:00.002000000")
# tdSql.checkData(3,0,"2018-09-17 09:00:00.003000000")
# tdSql.checkData(4,0,"2018-09-17 09:00:00.004000000")
# tdSql.checkData(5,0,"2018-09-17 09:00:00.005000000")
# tdSql.checkData(6,0,"2018-09-17 09:00:00.006000000")
# tdSql.checkData(7,0,"2018-09-17 09:00:00.007000000")
# tdSql.checkData(8,0,"2018-09-17 09:00:00.008000000")
# tdSql.checkData(9,0,"2018-09-17 09:00:00.009000000")
tdSql.query("select timetruncate(1,1d) from stb")
tdSql.checkRows(10)
tdSql.error("select timetruncate(1,1u) from stb")
#tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1a) from stb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1m) from stb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1h) from stb")
tdSql.checkRows(10)
tdSql.query("select timetruncate(ts,1d) from stb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 08:00:00.000")
tdSql.query("select timetruncate(ts,1h) from stb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.query("select timetruncate(ts,1m) from stb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.query("select timetruncate(ts,1s) from stb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.query("select timetruncate(ts,1a) from stb")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.checkData(1,0,"2018-09-17 09:00:00.001")
tdSql.checkData(2,0,"2018-09-17 09:00:00.002")
tdSql.checkData(3,0,"2018-09-17 09:00:00.003")
tdSql.checkData(4,0,"2018-09-17 09:00:00.004")
tdSql.checkData(5,0,"2018-09-17 09:00:00.005")
tdSql.checkData(6,0,"2018-09-17 09:00:00.006")
tdSql.checkData(7,0,"2018-09-17 09:00:00.007")
tdSql.checkData(8,0,"2018-09-17 09:00:00.008")
tdSql.checkData(9,0,"2018-09-17 09:00:00.009")
# tdSql.query("select timetruncate(ts,1u) from stb")
# tdSql.checkRows(10)
# tdSql.checkData(0,0,"2018-09-17 09:00:00.000000")
# tdSql.checkData(1,0,"2018-09-17 09:00:00.001000")
# tdSql.checkData(2,0,"2018-09-17 09:00:00.002000")
# tdSql.checkData(3,0,"2018-09-17 09:00:00.003000")
# tdSql.checkData(4,0,"2018-09-17 09:00:00.004000")
# tdSql.checkData(5,0,"2018-09-17 09:00:00.005000")
# tdSql.checkData(6,0,"2018-09-17 09:00:00.006000")
# tdSql.checkData(7,0,"2018-09-17 09:00:00.007000")
# tdSql.checkData(8,0,"2018-09-17 09:00:00.008000")
# tdSql.checkData(9,0,"2018-09-17 09:00:00.009000")
# tdSql.query("select timetruncate(ts,1b) from stb")
# tdSql.checkRows(10)
# tdSql.checkData(0,0,"2018-09-17 09:00:00.000000000")
# tdSql.checkData(1,0,"2018-09-17 09:00:00.001000000")
# tdSql.checkData(2,0,"2018-09-17 09:00:00.002000000")
# tdSql.checkData(3,0,"2018-09-17 09:00:00.003000000")
# tdSql.checkData(4,0,"2018-09-17 09:00:00.004000000")
# tdSql.checkData(5,0,"2018-09-17 09:00:00.005000000")
# tdSql.checkData(6,0,"2018-09-17 09:00:00.006000000")
# tdSql.checkData(7,0,"2018-09-17 09:00:00.007000000")
# tdSql.checkData(8,0,"2018-09-17 09:00:00.008000000")
# tdSql.checkData(9,0,"2018-09-17 09:00:00.009000000")
tdSql.query("select timetruncate(1,1d) from stb_1")
tdSql.checkRows(10)
tdSql.error("select timetruncate(1,1u) from stb_1")
#tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1a) from stb_1")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1m) from stb_1")
tdSql.checkRows(10)
tdSql.query("select timetruncate(1,1h) from stb_1")
tdSql.checkRows(10)
tdSql.query("select timetruncate(ts,1d) from stb_1")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 08:00:00.000")
tdSql.query("select timetruncate(ts,1h) from stb_1")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.query("select timetruncate(ts,1m) from stb_1")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.query("select timetruncate(ts,1s) from stb_1")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.query("select timetruncate(ts,1a) from stb_1")
tdSql.checkRows(10)
tdSql.checkData(0,0,"2018-09-17 09:00:00.000")
tdSql.checkData(1,0,"2018-09-17 09:00:00.001")
tdSql.checkData(2,0,"2018-09-17 09:00:00.002")
tdSql.checkData(3,0,"2018-09-17 09:00:00.003")
tdSql.checkData(4,0,"2018-09-17 09:00:00.004")
tdSql.checkData(5,0,"2018-09-17 09:00:00.005")
tdSql.checkData(6,0,"2018-09-17 09:00:00.006")
tdSql.checkData(7,0,"2018-09-17 09:00:00.007")
tdSql.checkData(8,0,"2018-09-17 09:00:00.008")
tdSql.checkData(9,0,"2018-09-17 09:00:00.009")
# tdSql.query("select timetruncate(ts,1u) from stb_1")
# tdSql.checkRows(10)
# tdSql.checkData(0,0,"2018-09-17 09:00:00.000000")
# tdSql.checkData(1,0,"2018-09-17 09:00:00.001000")
# tdSql.checkData(2,0,"2018-09-17 09:00:00.002000")
# tdSql.checkData(3,0,"2018-09-17 09:00:00.003000")
# tdSql.checkData(4,0,"2018-09-17 09:00:00.004000")
# tdSql.checkData(5,0,"2018-09-17 09:00:00.005000")
# tdSql.checkData(6,0,"2018-09-17 09:00:00.006000")
# tdSql.checkData(7,0,"2018-09-17 09:00:00.007000")
# tdSql.checkData(8,0,"2018-09-17 09:00:00.008000")
# tdSql.checkData(9,0,"2018-09-17 09:00:00.009000")
# tdSql.query("select timetruncate(ts,1b) from stb_1")
# tdSql.checkRows(10)
# tdSql.checkData(0,0,"2018-09-17 09:00:00.000000000")
# tdSql.checkData(1,0,"2018-09-17 09:00:00.001000000")
# tdSql.checkData(2,0,"2018-09-17 09:00:00.002000000")
# tdSql.checkData(3,0,"2018-09-17 09:00:00.003000000")
# tdSql.checkData(4,0,"2018-09-17 09:00:00.004000000")
# tdSql.checkData(5,0,"2018-09-17 09:00:00.005000000")
# tdSql.checkData(6,0,"2018-09-17 09:00:00.006000000")
# tdSql.checkData(7,0,"2018-09-17 09:00:00.007000000")
# tdSql.checkData(8,0,"2018-09-17 09:00:00.008000000")
# tdSql.checkData(9,0,"2018-09-17 09:00:00.009000000")
self.function_check_ntb()
self.function_check_stb()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)

View File

@ -0,0 +1,264 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 1
self.rowsPerTbl = 100000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
tdDnodes.start(1)
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379)
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379)
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 1
expectrowcnt = math.ceil(totalRowsInserted/3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
firstConsumeRows = resultList[0]
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 2
expectrowcnt = math.ceil(totalRowsInserted/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
secondConsumeRows = resultList[0]
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 3
expectrowcnt = math.ceil(totalRowsInserted/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
thirdConsumeRows = resultList[0]
if not (totalRowsInserted >= resultList[0]):
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# total consume
actConsumeTotalRows = firstConsumeRows + secondConsumeRows + thirdConsumeRows
if not (totalRowsInserted == actConsumeTotalRows):
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,245 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 3000
self.rowsPerTbl = 150
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 200,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.starttaosd(1)
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/5))
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 20,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/5))
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
tdLog.info("select result rows: %d"%totalRowsInserted)
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 1
expectrowcnt = math.ceil(totalRowsInserted/3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
firstConsumeRows = resultList[0]
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 2
expectrowcnt = math.ceil(totalRowsInserted*2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = firstConsumeRows + resultList[0]
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,242 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 3000
self.rowsPerTbl = 70
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
# tdDnodes.start(1)
tdDnodes.starttaosd(1)
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 1
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
firstConsumeRows = resultList[0]
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 2
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = firstConsumeRows + resultList[0]
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,243 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 1
self.rowsPerTbl = 1000000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
tdDnodes.start(1)
return
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 15,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379)
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 3
expectrowcnt = math.ceil(totalRowsInserted/3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
expectrowcnt = math.ceil(totalRowsInserted * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0] + resultList[1]
if not (totalRowsInserted == actConsumeTotalRows):
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 3 end ...... ")
def tmqCase4(self):
tdLog.printNoPrefix("======== test case 4: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 25,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+9379)
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 5
expectrowcnt = math.ceil(totalRowsInserted)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:500, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait commit notify")
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("pkill consume processor")
tdCom.killProcessor("tmq_sim")
# time.sleep(10)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 6
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 4 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase3()
self.tmqCase4()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,247 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 4000
self.rowsPerTbl = 150
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 200,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
tdDnodes.start(1)
return
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/5))
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
tdLog.info("select result rows: %d"%totalRowsInserted)
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 3
expectrowcnt = math.ceil(totalRowsInserted/3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
expectrowcnt = math.ceil(totalRowsInserted*2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0] + resultList[1]
if not (totalRowsInserted == actConsumeTotalRows):
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 3 end ...... ")
def tmqCase4(self):
tdLog.printNoPrefix("======== test case 4: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/5))
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
tdLog.info("select result rows: %d"%totalRowsInserted)
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 5
expectrowcnt = math.ceil(totalRowsInserted)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:300, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait commit notify")
tmqCom.getStartCommitNotifyFromTmqsim()
# tdLog.info("wait start consume notify")
# tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("pkill consume processor")
tdCom.killProcessor("tmq_sim")
# time.sleep(10)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 6
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 4 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase3()
self.tmqCase4()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,241 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 3000
self.rowsPerTbl = 70
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdDnodes.stop(1)
tdDnodes.start(1)
return
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 3
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0] + resultList[1]
if not (totalRowsInserted == actConsumeTotalRows):
tdLog.info("sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 3 end ...... ")
def tmqCase4(self):
tdLog.printNoPrefix("======== test case 4: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 5
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait commit notify")
tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("pkill consume processor")
tdCom.killProcessor("tmq_sim")
# time.sleep(10)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 6
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 4 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase3()
self.tmqCase4()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -165,3 +165,10 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb-1ctb.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb-1ctb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py

@ -1 +1 @@
Subproject commit c885e967e490105999b84d009a15168728dfafaf
Subproject commit c3815951fc80617ecd171f3743b8b4a4d0bc712e