Merge branch '3.0' into enh/refactorBackend
This commit is contained in:
commit
f74edc7618
Binary file not shown.
|
@ -38,11 +38,16 @@ Aggregation by time window is supported in TDengine. For example, in the case wh
|
|||
window_clause: {
|
||||
SESSION(ts_col, tol_val)
|
||||
| STATE_WINDOW(col)
|
||||
| INTERVAL(interval [, offset]) [SLIDING sliding] [FILL({NONE | VALUE | PREV | NULL | LINEAR | NEXT})]
|
||||
| INTERVAL(interval_val [, offset]) [SLIDING (sliding_value)] [FILL({NONE | VALUE | PREV | NULL | LINEAR | NEXT})]
|
||||
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
|
||||
}
|
||||
```
|
||||
|
||||
Both interval_val and sliding_value are time durations which have 3 forms of representation.
|
||||
- INTERVAL(1s, 500a) SLIDING(1s), the unit char should be any one of a (millisecond), b (nanosecond), d (day), h (hour), m (minute), n (month), s (second), u (microsecond), w (week), y (year).
|
||||
- INTERVAL(1000, 500) SLIDING(1000), the unit will the same as the queried database, if there are more than one databases, higher precision will be used.
|
||||
- INTERVAL('1s', '500a') SLIDING('1s'), unit must be specified, no spaces allowed.
|
||||
|
||||
The following restrictions apply:
|
||||
|
||||
### Other Rules
|
||||
|
|
|
@ -44,7 +44,11 @@ window_clause: {
|
|||
}
|
||||
```
|
||||
|
||||
在上述语法中的具体限制如下
|
||||
其中,interval_val 和 sliding_val 都表示时间段, 语法上支持三种方式,举例说明如下:
|
||||
- INTERVAL(1s, 500a) SLIDING(1s), 自带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微妙), w (周), y (年).
|
||||
- INTERVAL(1000, 500) SLIDING(1000), 不带时间单位的形式,将使用查询库的时间精度作为默认时间单位,当存在多个库时默认采用精度更高的库.
|
||||
- INTERVAL('1s', '500a') SLIDING('1s'), 自带时间单位的字符串形式,字符串内部不能有任何空格等其它字符.
|
||||
|
||||
|
||||
### 窗口子句的规则
|
||||
|
||||
|
|
|
@ -28,13 +28,16 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
// bool allNullRow = true;
|
||||
|
||||
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
uint64_t ts = 0;
|
||||
SFirstLastRes* p;
|
||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
||||
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
|
||||
int32_t slotId = slotIds[i];
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
||||
p = (SFirstLastRes*)varDataVal(pRes[i]);
|
||||
|
||||
p->ts = pColVal->ts;
|
||||
ts = p->ts;
|
||||
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
|
||||
// allNullRow = p->isNull & allNullRow;
|
||||
|
||||
|
@ -55,6 +58,20 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
|
||||
colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
||||
}
|
||||
for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
|
||||
if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
|
||||
continue;
|
||||
}
|
||||
if (pReader->numOfCols == 1 && dstSlotIds[0] != idx) {
|
||||
if (!p->isNull) {
|
||||
colDataSetVal(pCol, numOfRows, p->buf, false);
|
||||
} else {
|
||||
colDataSetNULL(pCol, numOfRows);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pBlock->info.rows += allNullRow ? 0 : 1;
|
||||
++pBlock->info.rows;
|
||||
|
|
|
@ -40,8 +40,10 @@
|
|||
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
|
||||
|
||||
typedef struct SGroupResInfo {
|
||||
int32_t index;
|
||||
SArray* pRows; // SArray<SResKeyPos>
|
||||
int32_t index; // rows consumed in func:doCopyToSDataBlockXX
|
||||
int32_t iter; // relate to index-1, last consumed data's slot id in hash table
|
||||
void* dataPos; // relate to index-1, last consumed data's position, in the nodelist of cur slot
|
||||
SArray* pRows; // SArray<SResKeyPos>
|
||||
char* pBuf;
|
||||
bool freeItem;
|
||||
} SGroupResInfo;
|
||||
|
|
|
@ -679,6 +679,12 @@ void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
|
|||
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||
SDiskbasedBuf* pBuf);
|
||||
|
||||
/**
|
||||
* @brief copydata from hash table, instead of copying from SGroupResInfo's pRow
|
||||
*/
|
||||
int32_t doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup);
|
||||
|
||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
||||
|
|
|
@ -191,9 +191,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pRes = pInfo->pRes;
|
||||
|
||||
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||
SColMatchItem* pMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||
int32_t slotId = pMatchInfo->dstSlotId;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pInfo->pBufferredRes->pDataBlock, i);
|
||||
int32_t slotId = pCol->info.slotId;
|
||||
|
||||
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
|
||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
|
||||
|
@ -201,8 +201,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
|||
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
|
||||
colDataSetNULL(pDst, 0);
|
||||
} else {
|
||||
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
|
||||
colDataSetVal(pDst, 0, p, false);
|
||||
if (pSrc->pData) {
|
||||
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
|
||||
colDataSetVal(pDst, 0, p, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -655,6 +655,85 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold,
|
||||
bool ignoreGroup) {
|
||||
SExprInfo* pExprInfo = pSup->pExprInfo;
|
||||
int32_t numOfExprs = pSup->numOfExprs;
|
||||
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
|
||||
SqlFunctionCtx* pCtx = pSup->pCtx;
|
||||
|
||||
size_t keyLen = 0;
|
||||
int32_t numOfRows = tSimpleHashGetSize(pHashmap);
|
||||
|
||||
// begin from last iter
|
||||
void* pData = pGroupResInfo->dataPos;
|
||||
int32_t iter = pGroupResInfo->iter;
|
||||
while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
|
||||
void* key = tSimpleHashGetKey(pData, &keyLen);
|
||||
SResultRowPosition* pos = pData;
|
||||
uint64_t groupId = *(uint64_t*)key;
|
||||
|
||||
SFilePage* page = getBufPage(pBuf, pos->pageId);
|
||||
if (page == NULL) {
|
||||
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
|
||||
|
||||
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
|
||||
|
||||
// no results, continue to check the next one
|
||||
if (pRow->numOfRows == 0) {
|
||||
pGroupResInfo->index += 1;
|
||||
pGroupResInfo->iter = iter;
|
||||
pGroupResInfo->dataPos = pData;
|
||||
|
||||
releaseBufPage(pBuf, page);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!ignoreGroup) {
|
||||
if (pBlock->info.id.groupId == 0) {
|
||||
pBlock->info.id.groupId = groupId;
|
||||
} else {
|
||||
// current value belongs to different group, it can't be packed into one datablock
|
||||
if (pBlock->info.id.groupId != groupId) {
|
||||
releaseBufPage(pBuf, page);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||
uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - iter) > 1 ? 1 : 0);
|
||||
blockDataEnsureCapacity(pBlock, newSize);
|
||||
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
|
||||
pBlock->info.capacity, GET_TASKID(pTaskInfo));
|
||||
// todo set the pOperator->resultInfo size
|
||||
}
|
||||
|
||||
pGroupResInfo->index += 1;
|
||||
pGroupResInfo->iter = iter;
|
||||
pGroupResInfo->dataPos = pData;
|
||||
|
||||
copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
|
||||
|
||||
releaseBufPage(pBuf, page);
|
||||
pBlock->info.rows += pRow->numOfRows;
|
||||
if (pBlock->info.rows >= threshold) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
||||
pBlock->info.id.groupId);
|
||||
pBlock->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) {
|
||||
SExprInfo* pExprInfo = pSup->pExprInfo;
|
||||
|
|
|
@ -370,6 +370,72 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
|
|||
return (pRes->info.rows == 0) ? NULL : pRes;
|
||||
}
|
||||
|
||||
bool hasRemainResultByHash(SOperatorInfo* pOperator) {
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable;
|
||||
return pInfo->groupResInfo.index < tSimpleHashGetSize(pHashmap);
|
||||
}
|
||||
|
||||
void doBuildResultDatablockByHash(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||
SDiskbasedBuf* pBuf) {
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SSDataBlock* pBlock = pInfo->binfo.pRes;
|
||||
|
||||
// set output datablock version
|
||||
pBlock->info.version = pTaskInfo->version;
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
if (!hasRemainResultByHash(pOperator)) {
|
||||
return;
|
||||
}
|
||||
|
||||
pBlock->info.id.groupId = 0;
|
||||
if (!pInfo->binfo.mergeResultBlock) {
|
||||
doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
|
||||
pHashmap, pOperator->resultInfo.threshold, false);
|
||||
} else {
|
||||
while (hasRemainResultByHash(pOperator)) {
|
||||
doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
|
||||
pHashmap, pOperator->resultInfo.threshold, true);
|
||||
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
|
||||
break;
|
||||
}
|
||||
pBlock->info.id.groupId = 0;
|
||||
}
|
||||
|
||||
// clear the group id info in SSDataBlock, since the client does not need it
|
||||
pBlock->info.id.groupId = 0;
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* buildGroupResultDataBlockByHash(SOperatorInfo* pOperator) {
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
// after filter, if result block turn to null, get next from whole set
|
||||
while (1) {
|
||||
doBuildResultDatablockByHash(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
|
||||
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
if (!hasRemainResultByHash(pOperator)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
// clean hash after completed
|
||||
tSimpleHashCleanup(pInfo->aggSup.pResultRowHashTable);
|
||||
pInfo->aggSup.pResultRowHashTable = NULL;
|
||||
break;
|
||||
}
|
||||
if (pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
return (pRes->info.rows == 0) ? NULL : pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -379,9 +445,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
return buildGroupResultDataBlock(pOperator);
|
||||
return buildGroupResultDataBlockByHash(pOperator);
|
||||
}
|
||||
|
||||
SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo;
|
||||
|
||||
int32_t order = pInfo->binfo.inputTsOrder;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
@ -425,10 +492,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
#endif
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
|
||||
// initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
|
||||
if (pGroupResInfo->pRows != NULL) {
|
||||
taosArrayDestroy(pGroupResInfo->pRows);
|
||||
}
|
||||
if (pGroupResInfo->pBuf) {
|
||||
taosMemoryFree(pGroupResInfo->pBuf);
|
||||
pGroupResInfo->pBuf = NULL;
|
||||
}
|
||||
pGroupResInfo->index = 0;
|
||||
pGroupResInfo->iter = 0;
|
||||
pGroupResInfo->dataPos = NULL;
|
||||
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
return buildGroupResultDataBlock(pOperator);
|
||||
return buildGroupResultDataBlockByHash(pOperator);
|
||||
}
|
||||
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {
|
||||
|
|
|
@ -2772,7 +2772,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_cache_last",
|
||||
.type = FUNCTION_TYPE_CACHE_LAST,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
|
|
@ -2478,6 +2478,27 @@ static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast
|
|||
return false;
|
||||
}
|
||||
|
||||
/// @brief check if we can apply last row scan optimization
|
||||
/// @param lastColNum how many distinct last col specified
|
||||
/// @param lastColId only used when lastColNum equals 1, the col id of the only one last col
|
||||
/// @param selectNonPKColNum num of normal cols
|
||||
/// @param selectNonPKColId only used when selectNonPKColNum equals 1, the col id of the only one select col
|
||||
static bool lastRowScanOptCheckColNum(int32_t lastColNum, col_id_t lastColId,
|
||||
int32_t selectNonPKColNum, col_id_t selectNonPKColId) {
|
||||
// multi select non pk col + last func: select c1, c2, last(c1)
|
||||
if (selectNonPKColNum > 1 && lastColNum > 0) return false;
|
||||
|
||||
if (selectNonPKColNum == 1) {
|
||||
// select last(c1), last(c2), c1 ...
|
||||
// which is not possible currently
|
||||
if (lastColNum > 1) return false;
|
||||
|
||||
// select last(c1), c2 ...
|
||||
if (lastColNum == 1 && lastColId != selectNonPKColId) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
|
||||
|
@ -2493,9 +2514,10 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool hasLastFunc = false;
|
||||
bool hasSelectFunc = false;
|
||||
SNode* pFunc = NULL;
|
||||
bool hasNonPKSelectFunc = false;
|
||||
SNode* pFunc = NULL;
|
||||
int32_t lastColNum = 0, selectNonPKColNum = 0;
|
||||
col_id_t lastColId = -1, selectNonPKColId = -1;
|
||||
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
|
||||
SFunctionNode* pAggFunc = (SFunctionNode*)pFunc;
|
||||
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
|
||||
|
@ -2505,16 +2527,33 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
|||
if (pCol->colType != COLUMN_TYPE_COLUMN) {
|
||||
return false;
|
||||
}
|
||||
if (lastColId != pCol->colId) {
|
||||
lastColId = pCol->colId;
|
||||
lastColNum++;
|
||||
}
|
||||
}
|
||||
if (hasSelectFunc || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pAggFunc->pParameterList, 0))) {
|
||||
if (QUERY_NODE_VALUE == nodeType(nodesListGetNode(pAggFunc->pParameterList, 0))) {
|
||||
return false;
|
||||
}
|
||||
hasLastFunc = true;
|
||||
if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId))
|
||||
return false;
|
||||
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType) {
|
||||
if (hasLastFunc) {
|
||||
SNode* pParam = nodesListGetNode(pAggFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN == nodeType(pParam)) {
|
||||
SColumnNode* pCol = (SColumnNode*)pParam;
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
|
||||
if (selectNonPKColId != pCol->colId) {
|
||||
selectNonPKColId = pCol->colId;
|
||||
selectNonPKColNum++;
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else if (lastColNum > 0) {
|
||||
return false;
|
||||
}
|
||||
hasSelectFunc = true;
|
||||
if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId))
|
||||
return false;
|
||||
} else if (FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
|
||||
if (!lastRowScanOptLastParaIsTag(nodesListGetNode(pAggFunc->pParameterList, 0))) {
|
||||
return false;
|
||||
|
@ -2581,6 +2620,9 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
|||
|
||||
SLastRowScanOptSetColDataTypeCxt cxt = {.doAgg = true, .pLastCols = NULL};
|
||||
SNode* pNode = NULL;
|
||||
SColumnNode* pPKTsCol = NULL;
|
||||
SColumnNode* pNonPKCol = NULL;
|
||||
|
||||
FOREACH(pNode, pAgg->pAggFuncs) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||
int32_t funcType = pFunc->funcType;
|
||||
|
@ -2597,6 +2639,16 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
|||
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
|
||||
nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
|
||||
}
|
||||
} else if (FUNCTION_TYPE_SELECT_VALUE == funcType) {
|
||||
pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
pPKTsCol = pCol;
|
||||
} else {
|
||||
pNonPKCol = pCol;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2608,6 +2660,16 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
|||
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true);
|
||||
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
|
||||
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false);
|
||||
if (pPKTsCol && pScan->node.pTargets->length == 1) {
|
||||
// when select last(ts),ts from ..., we add another ts to targets
|
||||
sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);
|
||||
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pPKTsCol));
|
||||
}
|
||||
if (pNonPKCol && cxt.pLastCols->length == 1 && nodesEqualNode((SNode*)pNonPKCol, nodesListGetNode(cxt.pLastCols, 0))) {
|
||||
// when select last(c1), c1 from ..., we add c1 to targets
|
||||
sprintf(pNonPKCol->colName, "#sel_val.%p", pNonPKCol);
|
||||
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pNonPKCol));
|
||||
}
|
||||
nodesClearList(cxt.pLastCols);
|
||||
}
|
||||
pAgg->hasLastRow = false;
|
||||
|
|
|
@ -77,6 +77,7 @@ struct SStreamSnapHandle {
|
|||
|
||||
SArray* pDbSnapSet;
|
||||
int32_t currIdx;
|
||||
int8_t delFlag; // 0 : not del, 1: del
|
||||
};
|
||||
struct SStreamSnapBlockHdr {
|
||||
int8_t type;
|
||||
|
@ -163,7 +164,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t snapFileCvtMeta(SBackendSnapFile2* pSnapFile) {
|
||||
int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
|
||||
SBackendFileItem item = {0};
|
||||
item.ref = 1;
|
||||
// current
|
||||
|
@ -253,7 +254,7 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke
|
|||
if ((code = snapFileReadMeta(pSnapFile)) != 0) {
|
||||
goto _ERROR;
|
||||
}
|
||||
if ((code = snapFileCvtMeta(pSnapFile)) != 0) {
|
||||
if ((code = snapFileGenMeta(pSnapFile)) != 0) {
|
||||
goto _ERROR;
|
||||
}
|
||||
|
||||
|
@ -399,7 +400,6 @@ _NEXT:
|
|||
|
||||
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
|
||||
int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
|
||||
|
||||
if (nread == -1) {
|
||||
taosMemoryFree(buf);
|
||||
code = TAOS_SYSTEM_ERROR(terrno);
|
||||
|
|
|
@ -369,6 +369,15 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
|
|||
}
|
||||
|
||||
switch (pItem->dtype) {
|
||||
case CFG_DTYPE_BOOL: {
|
||||
int32_t ival = (int32_t)atoi(pVal);
|
||||
if (ival != 0 && ival != 1) {
|
||||
uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name,
|
||||
cfgDtypeStr(pItem->dtype), ival);
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
} break;
|
||||
case CFG_DTYPE_INT32: {
|
||||
int32_t ival = (int32_t)atoi(pVal);
|
||||
if (ival < pItem->imin || ival > pItem->imax) {
|
||||
|
|
|
@ -69,6 +69,10 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/func_to_char_timestamp.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/func_to_char_timestamp.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/func_to_char_timestamp.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||
|
@ -232,6 +236,8 @@ e
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/splitVGroupRep1.py -N 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/splitVGroupRep3.py -N 3
|
||||
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py
|
||||
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
|
||||
|
|
|
@ -133,8 +133,6 @@ class AutoGen:
|
|||
if batch_size == 1 or (i > 0 and i % batch_size == 0) :
|
||||
sql = f"insert into {child_name} values {values}"
|
||||
tdSql.execute(sql)
|
||||
if batch_size > 40:
|
||||
tdLog.info(f" insert data i={i}")
|
||||
values = ""
|
||||
|
||||
# end batch
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,154 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.common import *
|
||||
from util.sqlset import *
|
||||
from util.autogen import *
|
||||
|
||||
import random
|
||||
import time
|
||||
import traceback
|
||||
import os
|
||||
from os import path
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
# init
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
# autoGen
|
||||
self.autoGen = AutoGen()
|
||||
# init cluster path
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
self.projDir = f"{projPath}sim/"
|
||||
tdLog.info(f" init projPath={self.projDir}")
|
||||
|
||||
def compactDatbase(self):
|
||||
# compact database
|
||||
tdSql.execute(f"compact database {self.dbname}", show=True)
|
||||
waitSeconds = 20
|
||||
if self.waitTranslation(waitSeconds) == False:
|
||||
tdLog.exit(f"translation can not finish after wait {waitSeconds} seconds")
|
||||
return
|
||||
|
||||
# check tsdb folder empty
|
||||
def check_tsdb_dir(self, tsdbDir):
|
||||
for vfile in os.listdir(tsdbDir):
|
||||
fileName, ext = os.path.splitext(vfile)
|
||||
pathFile = os.path.join(tsdbDir, vfile)
|
||||
ext = ext.lower()
|
||||
tdLog.info(f"check exist file {pathFile} ...")
|
||||
if ext == ".head" or ext == ".data" or ext == ".stt" or ext == ".sma":
|
||||
tdLog.info(f"found {pathFile} not to be deleted ...")
|
||||
real = True
|
||||
for i in range(50):
|
||||
tdLog.info(f"i={i} compact and try again ...")
|
||||
self.compactDatbase()
|
||||
if os.path.exists(pathFile) is False:
|
||||
real = False
|
||||
break
|
||||
else:
|
||||
time.sleep(0.5)
|
||||
tdLog.info(f"file real exist {pathFile} , sleep 500ms and try")
|
||||
|
||||
if real is False:
|
||||
continue
|
||||
fileStat = os.stat(pathFile)
|
||||
tdLog.exit(f" check file can not be deleted. file={pathFile} file size={fileStat.st_size}")
|
||||
|
||||
return True
|
||||
|
||||
# check vnode tsdb folder empty
|
||||
def check_filedelete(self):
|
||||
# put all vnode to list
|
||||
for dnode in os.listdir(self.projDir):
|
||||
vnodesDir = self.projDir + f"{dnode}/data/vnode/"
|
||||
if os.path.isdir(vnodesDir) == False or dnode[:5] != "dnode":
|
||||
continue
|
||||
print(f"vnodesDir={vnodesDir}")
|
||||
# enum all vnode
|
||||
for vnode in os.listdir(vnodesDir):
|
||||
vnodeDir = path.join(vnodesDir, vnode)
|
||||
print(f"vnodeDir={vnodeDir}")
|
||||
if os.path.isdir(vnodesDir):
|
||||
tsdbDir = path.join(vnodeDir, "tsdb")
|
||||
if path.exists(tsdbDir) :
|
||||
self.check_tsdb_dir(tsdbDir)
|
||||
|
||||
def waitTranslation(self, waitSeconds):
|
||||
# wait end
|
||||
for i in range(waitSeconds):
|
||||
sql ="show transactions;"
|
||||
rows = tdSql.query(sql)
|
||||
if rows == 0:
|
||||
return True
|
||||
tdLog.info(f"i={i} wait for translation finish ...")
|
||||
time.sleep(1)
|
||||
|
||||
return False
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
# seed
|
||||
random.seed(int(time.time()))
|
||||
self.dbname = "deletecheck"
|
||||
stbname = "meters"
|
||||
childname= "d"
|
||||
child_cnt = 2
|
||||
batch_size = 8000
|
||||
insert_rows = 100015
|
||||
start_ts = 1600000000000
|
||||
|
||||
self.autoGen.create_db(self.dbname)
|
||||
|
||||
loop = 3
|
||||
for i in range(loop):
|
||||
self.autoGen.create_stable(stbname, 4, 10, 4, 8)
|
||||
self.autoGen.create_child(stbname, childname, child_cnt)
|
||||
self.autoGen.set_batch_size(batch_size)
|
||||
self.autoGen.insert_data(insert_rows)
|
||||
self.autoGen.set_start_ts(start_ts)
|
||||
|
||||
if i % 2 == 1:
|
||||
tdSql.execute(f"flush database {self.dbname}", show=True)
|
||||
|
||||
# drop stable
|
||||
tdSql.execute(f"drop table {self.dbname}.{stbname} ", show = True)
|
||||
|
||||
self.compactDatbase()
|
||||
|
||||
# check file delete
|
||||
self.check_filedelete()
|
||||
tdLog.info(f"loop = {i+1} / {loop} check file delete ok after drop table successfully.")
|
||||
|
||||
start_ts += i*100000000
|
||||
|
||||
|
||||
# stop
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -0,0 +1,298 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
import math
|
||||
from datetime import datetime
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
# from tmqCommon import *
|
||||
|
||||
COMPARE_DATA = 0
|
||||
COMPARE_LEN = 1
|
||||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 10
|
||||
self.rowsPerTbl = 10000
|
||||
self.duraion = '1h'
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
|
||||
if dropFlag == 1:
|
||||
tsql.execute("drop database if exists %s"%(dbName))
|
||||
|
||||
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s CACHEMODEL 'both'"%(dbName, vgroups, replica, duration))
|
||||
tdLog.debug("complete to create database %s"%(dbName))
|
||||
return
|
||||
|
||||
def create_stable(self,tsql, paraDict):
|
||||
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
|
||||
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
|
||||
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
|
||||
tdLog.debug("%s"%(sqlString))
|
||||
tsql.execute(sqlString)
|
||||
return
|
||||
|
||||
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
|
||||
for i in range(ctbNum):
|
||||
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
|
||||
(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
|
||||
tsql.execute(sqlString)
|
||||
|
||||
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
|
||||
tdLog.debug("start to insert data ............")
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_insert = "insert into "
|
||||
sql = pre_insert
|
||||
|
||||
for i in range(ctbNum):
|
||||
rowsBatched = 0
|
||||
sql += " %s%d values "%(ctbPrefix,i)
|
||||
for j in range(rowsPerTbl):
|
||||
if (i < ctbNum/2):
|
||||
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d', %d) "%(startTs + j*tsStep, j%1000, j%500, j%1000, j%5000, j%5400, j%128, j%10000, j%1000, startTs+j*tsStep+1000)
|
||||
else:
|
||||
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d', %d) "%(startTs + j*tsStep, j%1000, j%500, j%1000, j%128, j%10000, j%1000, startTs + j*tsStep + 1000)
|
||||
rowsBatched += 1
|
||||
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
|
||||
tsql.execute(sql)
|
||||
rowsBatched = 0
|
||||
if j < rowsPerTbl - 1:
|
||||
sql = "insert into %s%d values " %(ctbPrefix,i)
|
||||
else:
|
||||
sql = "insert into "
|
||||
if sql != pre_insert:
|
||||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'test',
|
||||
'dropFlag': 1,
|
||||
'vgroups': 2,
|
||||
'stbName': 'meters',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},
|
||||
{'type': 'BIGINT', 'count':1},
|
||||
{'type': 'FLOAT', 'count':1},
|
||||
{'type': 'DOUBLE', 'count':1},
|
||||
{'type': 'smallint', 'count':1},
|
||||
{'type': 'tinyint', 'count':1},
|
||||
{'type': 'bool', 'count':1},
|
||||
{'type': 'binary', 'len':10, 'count':1},
|
||||
{'type': 'nchar', 'len':10, 'count':1},
|
||||
{'type': 'timestamp', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
|
||||
'ctbPrefix': 't',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 100,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 3000,
|
||||
'startTs': 1537146000000,
|
||||
'tsStep': 600000}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tdLog.info("create database")
|
||||
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
|
||||
|
||||
tdLog.info("create stb")
|
||||
self.create_stable(tsql=tdSql, paraDict=paraDict)
|
||||
|
||||
tdLog.info("create child tables")
|
||||
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
|
||||
stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
|
||||
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
|
||||
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
|
||||
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
|
||||
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
|
||||
return
|
||||
|
||||
def check_explain_res_has_row(self, plan_str_expect: str, rows, sql):
|
||||
plan_found = False
|
||||
for row in rows:
|
||||
if str(row).find(plan_str_expect) >= 0:
|
||||
tdLog.debug("plan: [%s] found in: [%s]" % (plan_str_expect, str(row)))
|
||||
plan_found = True
|
||||
break
|
||||
if not plan_found:
|
||||
tdLog.exit("plan: %s not found in res: [%s] in sql: %s" % (plan_str_expect, str(rows), sql))
|
||||
|
||||
def check_explain_res_no_row(self, plan_str_not_expect: str, res, sql):
|
||||
for row in res:
|
||||
if str(row).find(plan_str_not_expect) >= 0:
|
||||
tdLog.exit('plan: [%s] found in: [%s] for sql: %s' % (plan_str_not_expect, str(row), sql))
|
||||
|
||||
def explain_sql(self, sql: str):
|
||||
sql = "explain verbose true " + sql
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
return tdSql.queryResult
|
||||
|
||||
def explain_and_check_res(self, sqls, hasLastRowScanRes):
|
||||
for sql, has_last in zip(sqls, hasLastRowScanRes):
|
||||
res = self.explain_sql(sql)
|
||||
if has_last == 1:
|
||||
self.check_explain_res_has_row("Last Row Scan", res, sql)
|
||||
else:
|
||||
self.check_explain_res_no_row("Last Row Scan", res, sql)
|
||||
|
||||
def format_sqls(self, sql_template, select_items):
|
||||
sqls = []
|
||||
for item in select_items:
|
||||
sqls.append(sql_template % item)
|
||||
return sqls
|
||||
|
||||
def query_check_one(self, sql, res_expect):
|
||||
if res_expect is not None:
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
tdSql.checkRows(1)
|
||||
for i in range(0, tdSql.queryCols):
|
||||
tdSql.checkData(0, i, res_expect[i])
|
||||
tdLog.info('%s check res col: %d succeed value: %s' % (sql, i, str(res_expect[i])))
|
||||
|
||||
def query_check_sqls(self, sqls, has_last_row_scan_res, res_expect):
|
||||
for sql, has_last, res in zip(sqls, has_last_row_scan_res, res_expect):
|
||||
if has_last == 1:
|
||||
self.query_check_one(sql, res)
|
||||
|
||||
def test_last_cache_scan(self):
|
||||
sql_template = 'select %s from meters'
|
||||
select_items = [
|
||||
"last(ts), ts", "last(ts), c1", "last(ts), c2", "last(ts), c3",\
|
||||
"last(ts), c4", "last(ts), tbname", "last(ts), t1", "last(ts), ts, ts"]
|
||||
has_last_row_scan_res = [1, 0, 0, 0, 0, 0, 0, 1]
|
||||
res_expect = [
|
||||
["2018-11-25 19:30:00.000", "2018-11-25 19:30:00.000"],
|
||||
None, None, None, None, None, None,
|
||||
["2018-11-25 19:30:00.000", "2018-11-25 19:30:00.000", "2018-11-25 19:30:00.000"]
|
||||
]
|
||||
sqls = self.format_sqls(sql_template, select_items)
|
||||
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||
self.query_check_sqls(sqls, has_last_row_scan_res, res_expect)
|
||||
|
||||
select_items = ["last(c1),ts", "last(c1), c1", "last(c1), c2", "last(c1), c3",\
|
||||
"last(c1), c4", "last(c1), tbname", "last(c1), t1", "last(c1), ts, ts", "last(c1), c1, c1"]
|
||||
has_last_row_scan_res = [1, 1, 0, 0, 0, 0, 0, 1, 1]
|
||||
res_expect = [
|
||||
[999, "2018-11-25 19:30:00.000"],
|
||||
[999, 999], None, None, None, None, None,
|
||||
[999, "2018-11-25 19:30:00.000", "2018-11-25 19:30:00.000"],
|
||||
[999,999,999]
|
||||
]
|
||||
sqls = self.format_sqls(sql_template, select_items)
|
||||
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||
self.query_check_sqls(sqls, has_last_row_scan_res, res_expect)
|
||||
|
||||
sql_template = 'select %s from t1'
|
||||
select_items = ["last(c4),ts", "last(c4), c1", "last(c4), c2", "last(c4), c3",\
|
||||
"last(c4), c4", "last(c4), tbname", "last(c4), t1"]
|
||||
has_last_row_scan_res = [1, 0, 0, 0, 1, 0, 0]
|
||||
res_expect = [
|
||||
[4999.000000000000000, "2018-11-25 19:30:00.000"],
|
||||
None,None,None,
|
||||
[4999.000000000000000, 4999.000000000000000]
|
||||
]
|
||||
sqls = self.format_sqls(sql_template, select_items)
|
||||
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||
self.query_check_sqls(sqls, has_last_row_scan_res, res_expect)
|
||||
|
||||
sql_template = 'select %s from meters'
|
||||
select_items = ["last(c8), ts", "last(c8), c1", "last(c8), c8", "last(c8), tbname", \
|
||||
"last(c8), t1", "last(c8), c8, c8", "last(c8), ts, ts"]
|
||||
has_last_row_scan_res = [1, 0, 1, 0, 0, 1, 1]
|
||||
res_expect = [
|
||||
["binary9999", "2018-11-25 19:30:00.000"],
|
||||
None,
|
||||
["binary9999", "binary9999"],
|
||||
None, None,
|
||||
["binary9999", "binary9999", "binary9999"],
|
||||
["binary9999", "2018-11-25 19:30:00.000", "2018-11-25 19:30:00.000"]
|
||||
]
|
||||
sqls = self.format_sqls(sql_template, select_items)
|
||||
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||
self.query_check_sqls(sqls, has_last_row_scan_res, res_expect)
|
||||
|
||||
# c2, c4 in last row of t5,t6,t7,t8,t9 will always be NULL
|
||||
sql_template = 'select %s from t5'
|
||||
select_items = ["last(c4), ts", "last(c4), c4", "last(c4), c4, c4", "last(c4), ts, ts"]
|
||||
has_last_row_scan_res = [1,1,1,1]
|
||||
|
||||
sqls = self.format_sqls(sql_template, select_items)
|
||||
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||
#for sql in sqls:
|
||||
#tdSql.query(sql, queryTimes=1)
|
||||
#tdSql.checkRows(0)
|
||||
|
||||
sql_template = 'select %s from meters'
|
||||
select_items = [
|
||||
"last_row(ts), last(ts)",
|
||||
"last_row(c1), last(c1)",
|
||||
"last_row(c1), c1,c3, ts"
|
||||
]
|
||||
has_last_row_scan_res = [0,0,1]
|
||||
sqls = self.format_sqls(sql_template, select_items)
|
||||
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||
#res_expect = [None, None, [999, 999, 499, "2018-11-25 19:30:00.000"]]
|
||||
#self.query_check_sqls(sqls, has_last_row_scan_res, res_expect)
|
||||
|
||||
select_items = ["last(c10), c10",
|
||||
"last(c10), ts",
|
||||
"last(c10), c10, ts",
|
||||
"last(c10), c10, ts, c10,ts",
|
||||
"last(c10), ts, c1"]
|
||||
has_last_row_scan_res = [1,1,1,1,0]
|
||||
sqls = self.format_sqls(sql_template, select_items)
|
||||
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||
res_expect = [
|
||||
["2018-11-25 19:30:01.000", "2018-11-25 19:30:01.000"],
|
||||
["2018-11-25 19:30:01.000", "2018-11-25 19:30:00.000"],
|
||||
["2018-11-25 19:30:01.000", "2018-11-25 19:30:01.000", "2018-11-25 19:30:00.000"],
|
||||
["2018-11-25 19:30:01.000", "2018-11-25 19:30:01.000", "2018-11-25 19:30:00.000", "2018-11-25 19:30:01.000", "2018-11-25 19:30:00.000"]
|
||||
]
|
||||
self.query_check_sqls(sqls, has_last_row_scan_res, res_expect)
|
||||
|
||||
sql = "select last(c1), c1, c1+1, c1+2, ts from meters"
|
||||
res = self.explain_sql(sql)
|
||||
self.check_explain_res_has_row("Last Row Scan", res, sql)
|
||||
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, 999)
|
||||
tdSql.checkData(0, 1, 999)
|
||||
tdSql.checkData(0, 2, 1000)
|
||||
tdSql.checkData(0, 3, 1001)
|
||||
tdSql.checkData(0, 4, "2018-11-25 19:30:00.000")
|
||||
|
||||
def run(self):
|
||||
self.prepareTestEnv()
|
||||
#time.sleep(99999999)
|
||||
self.test_last_cache_scan()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue