support count empty table
This commit is contained in:
parent
709b7f4683
commit
e3098accc4
|
@ -1,4 +1,4 @@
|
|||
build/
|
||||
*build/
|
||||
compile_commands.json
|
||||
CMakeSettings.json
|
||||
.cache
|
||||
|
@ -132,3 +132,5 @@ tools/taos-tools
|
|||
tools/taosws-rs
|
||||
tags
|
||||
.clangd
|
||||
*CMakeCache*
|
||||
*CMakeFiles*
|
||||
|
|
|
@ -434,6 +434,7 @@ typedef struct STableScanPhysiNode {
|
|||
bool assignBlockUid;
|
||||
int8_t igCheckUpdate;
|
||||
bool filesetDelimited;
|
||||
bool needCountEmptyTable;
|
||||
} STableScanPhysiNode;
|
||||
|
||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||
|
|
|
@ -363,6 +363,7 @@ typedef struct SSelectStmt {
|
|||
bool hasLastRowFunc;
|
||||
bool hasLastFunc;
|
||||
bool hasTimeLineFunc;
|
||||
bool hasCountFunc;
|
||||
bool hasUdaf;
|
||||
bool hasStateKey;
|
||||
bool onlyHasKeepOrderFunc;
|
||||
|
|
|
@ -750,6 +750,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
|||
}
|
||||
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
|
||||
taosMemoryFree(pReq);
|
||||
pReq = NULL;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
|
|
@ -262,6 +262,7 @@ typedef struct STableScanInfo {
|
|||
int32_t scanTimes;
|
||||
SSDataBlock* pResBlock;
|
||||
SHashObj* pIgnoreTables;
|
||||
SHashObj* pValuedTables; // non empty table uids
|
||||
SSampleExecInfo sample; // sample execution info
|
||||
int32_t currentGroupId;
|
||||
int32_t currentTable;
|
||||
|
@ -269,8 +270,9 @@ typedef struct STableScanInfo {
|
|||
int8_t assignBlockUid;
|
||||
bool hasGroupByTag;
|
||||
bool countOnly;
|
||||
// TsdReader readerAPI;
|
||||
bool filesetDelimited;
|
||||
bool needCountEmptyTable;
|
||||
bool processingEmptyTable;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
|
|
|
@ -655,6 +655,50 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
|
|||
colDataDestroy(&infoData);
|
||||
}
|
||||
|
||||
|
||||
// record processed (non empty) table
|
||||
static int32_t insertTableToProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) {
|
||||
if (!pTableScanInfo->needCountEmptyTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (NULL == pTableScanInfo->pValuedTables) {
|
||||
int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList);
|
||||
pTableScanInfo->pValuedTables =
|
||||
taosHashInit(tableNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (NULL == pTableScanInfo->pValuedTables) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
taosHashPut(pTableScanInfo->pValuedTables, &uid, sizeof(uid), &pTableScanInfo->scanTimes,
|
||||
sizeof(pTableScanInfo->scanTimes));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STableKeyInfo* tbInfo) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
|
||||
blockDataEmpty(pBlock);
|
||||
pBlock->info.rows = 1;
|
||||
pBlock->info.id.uid = tbInfo->uid;
|
||||
pBlock->info.id.groupId = pOperator->dynamicTask ? tbInfo->uid : tbInfo->groupId;
|
||||
|
||||
// only one row: set all col data to null & hasNull
|
||||
int32_t col_num = blockDataGetNumOfCols(pBlock);
|
||||
for (int32_t i = 0; i < col_num; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
colDataSetNULL(pColInfoData, 0);
|
||||
}
|
||||
|
||||
// set tag/tbname
|
||||
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
|
||||
|
||||
pOperator->resultInfo.totalRows++;
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -722,7 +766,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
||||
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKeyInfo* pList, int32_t num) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
@ -736,6 +780,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
|||
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||
if (p != NULL) {
|
||||
insertTableToProcessed(pTableScanInfo, p->info.id.uid);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -764,6 +809,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
|||
while (pTableScanInfo->scanTimes < total) {
|
||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||
if (p != NULL) {
|
||||
insertTableToProcessed(pTableScanInfo, p->info.id.uid);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -780,6 +826,39 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pTableScanInfo->needCountEmptyTable) {
|
||||
if (num == 0 && 0 == taosHashGetSize(pTableScanInfo->pValuedTables)) {
|
||||
// table by table, num is 0
|
||||
if (!pTableScanInfo->processingEmptyTable) {
|
||||
pTableScanInfo->processingEmptyTable = true;
|
||||
// current table is empty, fill result block info & return
|
||||
const STableKeyInfo* info = tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->currentTable);
|
||||
return getBlockForEmptyTable(pOperator, info);
|
||||
}
|
||||
|
||||
} else if (num > taosHashGetSize(pTableScanInfo->pValuedTables)) {
|
||||
// group by group, num >= 1
|
||||
if (!pTableScanInfo->processingEmptyTable) {
|
||||
pTableScanInfo->processingEmptyTable = true;
|
||||
pTableScanInfo->currentTable = 0;
|
||||
}
|
||||
if (pTableScanInfo->currentTable < num) {
|
||||
// loop: get empty table uid & process
|
||||
while (pTableScanInfo->currentTable < num) {
|
||||
const STableKeyInfo* info = pList + pTableScanInfo->currentTable++;
|
||||
if (pTableScanInfo->pValuedTables &&
|
||||
NULL != taosHashGet(pTableScanInfo->pValuedTables, &info->uid, sizeof(info->uid))) {
|
||||
} else {
|
||||
return getBlockForEmptyTable(pOperator, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pTableScanInfo->processingEmptyTable = false;
|
||||
}
|
||||
taosHashClear(pTableScanInfo->pValuedTables);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -861,7 +940,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
|||
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
|
||||
pInfo->scanTimes = 0;
|
||||
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator, pList, num);
|
||||
if (result != NULL) {
|
||||
if (pOperator->dynamicTask) {
|
||||
result->info.id.groupId = result->info.id.uid;
|
||||
|
@ -876,15 +955,16 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
|||
STableScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
int32_t num = 0;
|
||||
STableKeyInfo* pList = NULL;
|
||||
|
||||
if (pInfo->currentGroupId == -1) {
|
||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo) || numOfTables == 0) {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
STableKeyInfo* pList = NULL;
|
||||
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
|
||||
ASSERT(pInfo->base.dataReader == NULL);
|
||||
|
||||
|
@ -899,9 +979,11 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
|||
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
|
||||
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
|
||||
}
|
||||
} else {
|
||||
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
|
||||
}
|
||||
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator, pList, num);
|
||||
if (result != NULL) {
|
||||
if (pOperator->dynamicTask) {
|
||||
result->info.id.groupId = result->info.id.uid;
|
||||
|
@ -923,7 +1005,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
STableScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
|
||||
if (pOperator->pOperatorGetParam) {
|
||||
pOperator->dynamicTask = true;
|
||||
int32_t code = createTableListInfoFromParam(pOperator);
|
||||
|
@ -952,7 +1034,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
STableKeyInfo tInfo = {0};
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator, NULL, 0);
|
||||
if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
|
||||
return result;
|
||||
}
|
||||
|
@ -1012,6 +1094,7 @@ static void destroyTableScanOperatorInfo(void* param) {
|
|||
STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
|
||||
blockDataDestroy(pTableScanInfo->pResBlock);
|
||||
taosHashCleanup(pTableScanInfo->pIgnoreTables);
|
||||
taosHashCleanup(pTableScanInfo->pValuedTables);
|
||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
@ -1075,6 +1158,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
|
||||
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
|
||||
pInfo->processingEmptyTable = false;
|
||||
|
||||
pInfo->base.pTableListInfo = pTableListInfo;
|
||||
pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
|
||||
if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
|
||||
|
|
|
@ -652,6 +652,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
|
|||
COPY_SCALAR_FIELD(watermark);
|
||||
COPY_SCALAR_FIELD(igExpired);
|
||||
COPY_SCALAR_FIELD(filesetDelimited);
|
||||
COPY_SCALAR_FIELD(needCountEmptyTable);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1841,6 +1841,7 @@ static const char* jkTableScanPhysiPlanSubtable = "Subtable";
|
|||
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
|
||||
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
||||
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
||||
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
@ -1912,7 +1913,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanFilesetDelimited, pNode->filesetDelimited);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1986,7 +1989,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanFilesetDelimited, &pNode->filesetDelimited);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -2170,6 +2170,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueBool(pEncoder, pNode->filesetDelimited);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueBool(pEncoder, pNode->needCountEmptyTable);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2251,7 +2254,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvDecodeValueBool(pDecoder, &pNode->filesetDelimited);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvDecodeValueBool(pDecoder, &pNode->needCountEmptyTable);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1902,6 +1902,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
|
|||
if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt;
|
||||
pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId);
|
||||
pSelect->hasCountFunc = pSelect->hasCountFunc ? true : (FUNCTION_TYPE_COUNT == pFunc->funcType);
|
||||
pSelect->hasRepeatScanFuncs = pSelect->hasRepeatScanFuncs ? true : fmIsRepeatScanFunc(pFunc->funcId);
|
||||
|
||||
if (fmIsIndefiniteRowsFunc(pFunc->funcId)) {
|
||||
|
|
|
@ -587,6 +587,27 @@ static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
||||
}
|
||||
|
||||
static bool calcNeedCountEmpty(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
|
||||
// refuse interval
|
||||
if (pScanLogicNode->interval > 0) {
|
||||
return false;
|
||||
}
|
||||
SNode* pRoot = pCxt->pPlanCxt->pAstRoot;
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pRoot)) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pRoot;
|
||||
// select & count
|
||||
if (pSelect->hasCountFunc) {
|
||||
// key only accept tag/tbname
|
||||
if (NULL != pSelect->pGroupByList) {
|
||||
return !keysHasCol(pSelect->pGroupByList);
|
||||
} else if (NULL != pSelect->pPartitionByList) {
|
||||
return !keysHasCol(pSelect->pPartitionByList);
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
||||
SPhysiNode** pPhyNode) {
|
||||
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
|
||||
|
@ -623,6 +644,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
|||
pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
|
||||
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
|
||||
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
|
||||
pTableScan->needCountEmptyTable = calcNeedCountEmpty(pCxt, pScanLogicNode);
|
||||
|
||||
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -347,6 +347,7 @@ e
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat_ws2.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/cos.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/cos.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/group_partition.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count.py
|
||||
|
|
|
@ -57,8 +57,11 @@ class TDTestCase:
|
|||
tdSql.query(f'select count(*) from {self.stbname}')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
rows = [2, 0]
|
||||
function_names = ['count', 'hyperloglog']
|
||||
for function_name in function_names:
|
||||
for i in range(2):
|
||||
function_name = function_names[i]
|
||||
row = rows[i]
|
||||
tdSql.query(f'select {function_name}(tbname) from {self.stbname}')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
|
@ -93,17 +96,17 @@ class TDTestCase:
|
|||
tdSql.query(f'select sum(1),max(c2),min(1),leastsquares(c1,1,1) from {self.stbname}')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} group by tbname')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.checkRows(row)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} group by c1')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} group by t0')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.checkRows(row)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.checkRows(row)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by c1')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by t0')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.checkRows(row)
|
||||
tdSql.query(f'select {function_name}(1) from (select {function_name}(c1),sum(c1) from {self.stbname} group by c1)')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
|
@ -113,17 +116,24 @@ class TDTestCase:
|
|||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by c1 interval(1s)')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select {function_name}(1),sum(1) from (select {function_name}(1) from {self.stbname} group by tbname)')
|
||||
tdSql.query(f'select {function_name}(1),sum(1) from (select {function_name}(1) from {self.stbname} group by tbname order by tbname)')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.checkData(0, 1, None)
|
||||
if 'count' == function_name:
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.checkData(0, 1, None)
|
||||
elif 'hyperloglog' == function_name:
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
|
||||
def query_empty_ntb(self):
|
||||
tdSql.query(f'select count(*) from {self.ntbname}')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
rows = [1, 0]
|
||||
function_names = ['count', 'hyperloglog']
|
||||
for function_name in function_names:
|
||||
for i in range(2):
|
||||
function_name = function_names[i]
|
||||
row = rows[i]
|
||||
tdSql.query(f'select {function_name}(tbname) from {self.ntbname}')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
|
@ -158,7 +168,7 @@ class TDTestCase:
|
|||
tdSql.query(f'select sum(1),max(c2),min(1),leastsquares(c1,1,1) from {self.ntbname}')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.ntbname} group by tbname')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.checkRows(row)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.ntbname} group by c1')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select {function_name}(1) from (select {function_name}(c1),sum(c1) from {self.ntbname} group by c1)')
|
||||
|
@ -170,10 +180,11 @@ class TDTestCase:
|
|||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.ntbname} partition by c1 interval(1s)')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select count(1),sum(1) from (select count(1) from {self.ntbname} group by tbname)')
|
||||
tdSql.query(f'select count(1),sum(1) from (select count(1) from {self.ntbname} group by tbname order by tbname)')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 0)
|
||||
tdSql.checkData(0, 1, None)
|
||||
|
||||
def count_query_stb(self,column_dict,tag_dict,stbname,tbnum,rownum):
|
||||
tdSql.query(f'select count(tbname) from {stbname}')
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0],tbnum*rownum)
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
# author : bobliu
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
self.row_nums = 10
|
||||
self.tb_nums = 10
|
||||
self.ts = 1537146000000
|
||||
self.dbname = "db"
|
||||
self.stable = "stb"
|
||||
|
||||
def prepare_db(self):
|
||||
tdSql.execute(f" use {self.dbname} ")
|
||||
tdSql.execute(f" create stable {self.dbname}.{self.stable} (ts timestamp , c1 int , c2 bigint , c3 float , c4 double , c5 smallint , c6 tinyint , c7 bool , c8 binary(36) , c9 nchar(36) , uc1 int unsigned,\
|
||||
uc2 bigint unsigned ,uc3 smallint unsigned , uc4 tinyint unsigned ) tags(t1 timestamp , t2 int , t3 bigint , t4 float , t5 double , t6 smallint , t7 tinyint , t8 bool , t9 binary(36)\
|
||||
, t10 nchar(36) , t11 int unsigned , t12 bigint unsigned ,t13 smallint unsigned , t14 tinyint unsigned ) ")
|
||||
|
||||
for i in range(self.tb_nums):
|
||||
tbname = f"{self.dbname}.sub_{self.stable}_{i}"
|
||||
ts = self.ts + i*10000
|
||||
tdSql.execute(f"create table {tbname} using {self.dbname}.{self.stable} tags ({ts} , {i} , {i}*10 ,{i}*1.0,{i}*1.0 , 1 , 2, 'true', 'binary_{i}' ,'nchar_{i}',{i},{i},10,20 )")
|
||||
|
||||
def insert_db(self, tb_nums, row_nums):
|
||||
for i in range(tb_nums):
|
||||
tbname = f"{self.dbname}.sub_{self.stable}_{i}"
|
||||
ts_base = self.ts + i*10000
|
||||
for row in range(row_nums):
|
||||
ts = ts_base + row*1000
|
||||
tdSql.execute(f"insert into {tbname} values({ts} , {row} , {row} , {row} , {row} , 1 , 2 , 'true' , 'binary_{row}' , 'nchar_{row}' , {row} , {row} , 1 ,2 )")
|
||||
|
||||
|
||||
def test_groupby(self, check_num, real_num):
|
||||
# tbname
|
||||
tdSql.query(f"select count(*) from {self.dbname}.{self.stable} group by tbname ")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
tdSql.query(f"select count(*), sum(1) from {self.dbname}.{self.stable} group by tbname ")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} group by tbname ")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
# having filter out empty
|
||||
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} group by tbname having count(*) <= 0")
|
||||
tdSql.checkRows(check_num - real_num)
|
||||
|
||||
# tag
|
||||
tdSql.query(f"select count(*) from {self.dbname}.{self.stable} group by t2 ")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} group by t2 ")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
# having
|
||||
tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} group by t2 having count(*) <= 0")
|
||||
tdSql.checkRows(check_num - real_num)
|
||||
|
||||
# col where filter nothing
|
||||
# tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} where ts < now group by t2 ")
|
||||
# tdSql.checkRows(check_num)
|
||||
|
||||
############### same with old ###############
|
||||
# col where filter all
|
||||
# tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} where ts > 1737146000000 group by t2 ")
|
||||
# tdSql.checkRows(0)
|
||||
|
||||
# col where filter part
|
||||
tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} where c1 = 1 group by t2 ")
|
||||
tdSql.checkRows(real_num)
|
||||
|
||||
# col
|
||||
tdSql.query(f"select count(c1) from {self.dbname}.{self.stable} group by tbname ")
|
||||
tdSql.checkRows(real_num)
|
||||
|
||||
# count + sum(col)
|
||||
tdSql.query(f"select count(*), sum(c1) from {self.dbname}.{self.stable} group by tbname ")
|
||||
tdSql.checkRows(real_num)
|
||||
|
||||
tdSql.query(f"select c1, count(*) from {self.dbname}.{self.stable} group by c1 ")
|
||||
num = 0
|
||||
if real_num > 0:
|
||||
num = self.row_nums
|
||||
tdSql.checkRows(num)
|
||||
|
||||
tdSql.query(f"select ts, count(*) from {self.dbname}.{self.stable} group by ts ")
|
||||
tdSql.checkRows(real_num * self.row_nums)
|
||||
|
||||
# col + tag
|
||||
tdSql.query(f"select t2, c1, count(*) from {self.dbname}.{self.stable} group by t2, c1 ")
|
||||
tdSql.checkRows(real_num * self.row_nums)
|
||||
|
||||
|
||||
def test_partitionby(self, check_num, real_num):
|
||||
tdSql.query(f"select tbname , count(*) from {self.dbname}.{self.stable} partition by tbname ")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname ")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
# having filter out empty
|
||||
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname having count(*) <= 0")
|
||||
tdSql.checkRows(check_num - real_num)
|
||||
|
||||
#tag
|
||||
tdSql.query(f"select count(*) from {self.dbname}.{self.stable} partition by t2 ")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} partition by t2 ")
|
||||
tdSql.checkRows(check_num)
|
||||
|
||||
# having
|
||||
tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} partition by t2 having count(*) <= 0")
|
||||
tdSql.checkRows(check_num - real_num)
|
||||
|
||||
# col where filter nothing
|
||||
# tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} where ts < now partition by t2 ")
|
||||
# tdSql.checkRows(check_num)
|
||||
|
||||
############### same with old ###############
|
||||
# col where filter all
|
||||
# tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} where ts > 1737146000000 partition by t2 ")
|
||||
# tdSql.checkRows(0)
|
||||
|
||||
# col where filter part
|
||||
tdSql.query(f"select t2, count(*) from {self.dbname}.{self.stable} where c1 = 1 partition by t2 ")
|
||||
tdSql.checkRows(real_num)
|
||||
|
||||
#col
|
||||
tdSql.query(f"select count(c1) from {self.dbname}.{self.stable} partition by tbname ")
|
||||
tdSql.checkRows(real_num)
|
||||
|
||||
tdSql.query(f"select c1, count(*) from {self.dbname}.{self.stable} partition by c1 ")
|
||||
num = 0
|
||||
if real_num > 0:
|
||||
num = self.row_nums
|
||||
tdSql.checkRows(num)
|
||||
|
||||
tdSql.query(f"select ts, count(*) from {self.dbname}.{self.stable} partition by ts ")
|
||||
tdSql.checkRows(real_num * self.row_nums)
|
||||
|
||||
tdSql.query(f"select t2, c1, count(*) from {self.dbname}.{self.stable} partition by t2, c1 ")
|
||||
tdSql.checkRows(real_num * self.row_nums)
|
||||
|
||||
def test_error(self):
|
||||
tdSql.error(f"select * from {self.dbname}.{self.stable} group by t2")
|
||||
tdSql.error(f"select t2, count(*) from {self.dbname}.{self.stable} group by t2 where t2 = 1")
|
||||
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.prepare_db()
|
||||
check_num = self.tb_nums
|
||||
self.test_groupby(check_num, 0)
|
||||
self.test_partitionby(check_num, 0)
|
||||
# insert into half of tables
|
||||
real_num = 5
|
||||
self.insert_db(real_num, self.row_nums)
|
||||
self.test_groupby(check_num, real_num)
|
||||
self.test_partitionby(check_num, real_num)
|
||||
|
||||
# test old version before changed
|
||||
# self.test_groupby(0, 0)
|
||||
# self.test_partitionby(0, 0)
|
||||
# self.insert_db(5, self.row_nums)
|
||||
# self.test_groupby(5, 5)
|
||||
# self.test_partitionby(5, 5)
|
||||
|
||||
self.test_error()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -1112,13 +1112,13 @@ class TDTestCase:
|
|||
def TS_3932(self):
|
||||
tdLog.debug("test insert data into stable")
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 100)
|
||||
tdSql.checkData(1, 1, 200)
|
||||
|
||||
tdSql.query(f"insert into nested.stable_1 (ts,tbname) values(now,'stable_1_1');")
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 101)
|
||||
tdSql.checkData(1, 1, 200)
|
||||
|
||||
|
@ -1127,7 +1127,7 @@ class TDTestCase:
|
|||
coulmn_name = qlist[i]
|
||||
tdSql.execute(f"insert into nested.stable_1 (ts, tbname, {coulmn_name}) values(now+{i}s,'stable_1_1',1);")
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;",queryTimes=5)
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 111)
|
||||
tdSql.checkData(1, 1, 200)
|
||||
|
||||
|
@ -1136,7 +1136,7 @@ class TDTestCase:
|
|||
coulmn_name = q_null_list[i]
|
||||
tdSql.execute(f"insert into nested.stable_1 (ts, tbname, {coulmn_name}) values(now+{i}s,'stable_1_1',1);")
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;",queryTimes=5)
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 121)
|
||||
tdSql.checkData(1, 1, 200)
|
||||
|
||||
|
@ -1184,7 +1184,7 @@ class TDTestCase:
|
|||
def TS_3932_flushdb(self):
|
||||
tdLog.debug("test flush db and insert data into stable")
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 121)
|
||||
tdSql.checkData(1, 1, 200)
|
||||
|
||||
|
@ -1192,7 +1192,7 @@ class TDTestCase:
|
|||
q_null_list = ['q_int_null', 'q_bigint_null', 'q_smallint_null', 'q_tinyint_null', 'q_float_null', 'q_double_null', 'q_bool_null', 'q_binary_null', 'q_nchar_null', 'q_ts_null']
|
||||
tdSql.query(f"insert into nested.stable_1 (ts,tbname) values(now,'stable_1_1');")
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 122)
|
||||
tdSql.checkData(1, 1, 200)
|
||||
|
||||
|
@ -1200,7 +1200,7 @@ class TDTestCase:
|
|||
coulmn_name = qlist[i]
|
||||
tdSql.execute(f"insert into nested.stable_1 (ts, tbname, {coulmn_name}) values(now+{i}s,'stable_1_1',1);")
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 132)
|
||||
tdSql.checkData(1, 1, 200)
|
||||
|
||||
|
@ -1208,7 +1208,7 @@ class TDTestCase:
|
|||
coulmn_name = q_null_list[i]
|
||||
tdSql.execute(f"insert into nested.stable_1 (ts, tbname, {coulmn_name}) values(now+{i}s,'stable_1_1',1);")
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 142)
|
||||
tdSql.checkData(1, 1, 200)
|
||||
|
||||
|
@ -1223,7 +1223,7 @@ class TDTestCase:
|
|||
nested.stable_1 (ts,tbname,q_nchar) values(now+8a,'stable_1_1',1)\
|
||||
nested.stable_1 (ts,tbname,q_ts) values(now+9a,'stable_1_1',1);")
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 152);
|
||||
tdSql.checkData(1, 1, 200);
|
||||
|
||||
|
@ -1330,7 +1330,7 @@ class TDTestCase:
|
|||
nested.stable_null_childtable (ts,tbname,q_ts) values(now+9a,'stable_null_childtable_1',1);")
|
||||
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 1, 162);
|
||||
tdSql.checkData(1, 1, 200);
|
||||
|
||||
|
@ -1349,7 +1349,7 @@ class TDTestCase:
|
|||
nested.stable_null_childtable (ts,tbname,q_int) values(now,'$^%$%^&',1);")
|
||||
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkRows(7)
|
||||
tdSql.checkData(0, 1, 1);
|
||||
tdSql.checkData(1, 1, 162);
|
||||
tdSql.checkData(2, 1, 200);
|
||||
|
@ -1387,7 +1387,7 @@ class TDTestCase:
|
|||
nested.stable_null_childtable(tbname,ts,q_int,q_binary) file '{self.testcasePath}/stable_null_childtable.csv';")
|
||||
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkRows(7)
|
||||
tdSql.checkData(0, 1, 1);
|
||||
tdSql.checkData(1, 1, 162);
|
||||
tdSql.checkData(2, 1, 200);
|
||||
|
@ -1423,7 +1423,7 @@ class TDTestCase:
|
|||
tdSql.query(f"insert into nested.stable_null_childtable(tbname,ts,q_int,q_binary) file '{self.testcasePath}/stable_null_childtable.csv';")
|
||||
|
||||
tdSql.query(f"select tbname,count(*) from nested.stable_1 group by tbname order by tbname;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkRows(7)
|
||||
tdSql.checkData(0, 1, 1);
|
||||
tdSql.checkData(1, 1, 162);
|
||||
tdSql.checkData(2, 1, 200);
|
||||
|
|
Loading…
Reference in New Issue