diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index 7c23184d93..d5e9a2e625 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -103,6 +103,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t avgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t stddevScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t leastSQRScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 5d3cfee592..66843d9a28 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -177,7 +177,6 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t saveOneRow(pRow, pResBlock, pr, slotIds); taosArrayPush(pTableUidList, &pKeyInfo->uid); - // taosMemoryFree(pRow); tsdbCacheRelease(lruCache, h); pr->tableIndex += 1; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1a9e12c9ca..f0aea0cefb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -830,9 +830,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - uint8_t *pb = NULL, *pb1 = NULL; int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, - pBlockData, &pb, &pb1); + pBlockData, NULL, NULL); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3007,11 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { + tBlockDataClear(&pStatus->fileBlockData); + terrno = code; return NULL; } copyBlockDataToSDataBlock(pReader, pBlockScanInfo); + tBlockDataClear(&pStatus->fileBlockData); return pReader->pResBlock->pDataBlock; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 897373e6c8..0d57a6370c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -316,12 +316,16 @@ typedef struct STagScanInfo { typedef struct SLastrowScanInfo { SSDataBlock *pRes; - SArray *pTableList; SReadHandle readHandle; void *pLastrowReader; SArray *pColMatchInfo; int32_t *pSlotIds; SExprSupp pseudoExprSup; + int32_t retrieveType; + int32_t currentGroupIndex; + SSDataBlock *pBufferredRes; + SArray *pUidList; + int32_t indexOfBufferedRes; } SLastrowScanInfo; typedef enum EStreamScanMode { @@ -825,8 +829,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SArray* pTableList, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, @@ -944,8 +947,9 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, +int32_t createScanTableListInfo(SScanPhysiNode *pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId); + SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo, diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 9034397d0f..78a7e58ee1 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -30,15 +30,13 @@ static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator); static void destroyLastrowScanOperator(void* param, int32_t numOfOutput); static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pTableList = pTableList; pInfo->readHandle = *readHandle; pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); @@ -50,8 +48,22 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead goto _error; } - tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_SINGLE, pTableList, taosArrayGetSize(pInfo->pColMatchInfo), - &pInfo->pLastrowReader); + STableListInfo* pTableList = &pTaskInfo->tableqinfoList; + + initResultSizeInfo(pOperator, 1024); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); + + // partition by tbname + if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { + pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_ALL; + tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, + taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); + blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity); + } else { // by tags + pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_SINGLE; + } if (pScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup; @@ -60,19 +72,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset); } - pOperator->name = "LastrowScanOperator"; + pOperator->name = "LastrowScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - initResultSizeInfo(pOperator, 1024); - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); + pOperator->cost.openCost = 0; return pOperator; @@ -90,43 +100,105 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { SLastrowScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - int32_t size = taosArrayGetSize(pInfo->pTableList); + STableListInfo* pTableList = &pTaskInfo->tableqinfoList; + int32_t size = taosArrayGetSize(pTableList->pTableList); if (size == 0) { - setTaskStatus(pTaskInfo, TASK_COMPLETED); + doSetOperatorCompleted(pOperator); return NULL; } + blockDataCleanup(pInfo->pRes); + // check if it is a group by tbname - if (size == taosArrayGetSize(pInfo->pTableList)) { - blockDataCleanup(pInfo->pRes); - SArray* pUidList = taosArrayInit(1, sizeof(tb_uid_t)); - int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pUidList); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + if (pInfo->retrieveType == LASTROW_RETRIEVE_TYPE_ALL) { + if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { + blockDataCleanup(pInfo->pBufferredRes); + taosArrayClear(pInfo->pUidList); + + int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + // check for tag values + int32_t resultRows = pInfo->pBufferredRes->info.rows; + ASSERT(resultRows == taosArrayGetSize(pInfo->pUidList)); + pInfo->indexOfBufferedRes = 0; } - // check for tag values - if (pInfo->pRes->info.rows > 0 && pInfo->pseudoExprSup.numOfExprs > 0) { - SExprSupp* pSup = &pInfo->pseudoExprSup; - pInfo->pRes->info.uid = *(tb_uid_t*) taosArrayGet(pUidList, 0); - addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, GET_TASKID(pTaskInfo)); + if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { + for(int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { + SColMatchInfo* pMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); + int32_t slotId = pMatchInfo->targetSlotId; + + SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId); + + char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes); + bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes); + colDataAppend(pDst, 0, p, isNull); + } + + if (pInfo->pseudoExprSup.numOfExprs > 0) { + SExprSupp* pSup = &pInfo->pseudoExprSup; + addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + GET_TASKID(pTaskInfo)); + } + + pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); + int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); + pInfo->pRes->info.groupId = *groupId; + + pInfo->indexOfBufferedRes += 1; + pInfo->pRes->info.rows = 1; + return pInfo->pRes; + } else { + doSetOperatorCompleted(pOperator); + return NULL; + } + } else { + size_t totalGroups = taosArrayGetSize(pTableList->pGroupList); + + while (pInfo->currentGroupIndex < totalGroups) { + SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); + + tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, + taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + taosArrayClear(pInfo->pUidList); + + int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + pInfo->currentGroupIndex += 1; + + // check for tag values + if (pInfo->pRes->info.rows > 0) { + if (pInfo->pseudoExprSup.numOfExprs > 0) { + SExprSupp* pSup = &pInfo->pseudoExprSup; + pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); + + STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); + pInfo->pRes->info.groupId = pKeyInfo->groupId; + + addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + GET_TASKID(pTaskInfo)); + } + + tsdbLastrowReaderClose(pInfo->pLastrowReader); + return pInfo->pRes; + } } doSetOperatorCompleted(pOperator); - return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; - } else { - // todo fetch the result for each group + return NULL; } - - return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param; blockDataDestroy(pInfo->pRes); - tsdbLastrowReaderClose(pInfo->pLastrowReader); - taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 89542571ea..c8f2083456 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -514,8 +514,10 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pInput->startRowIndex = 0; // NOTE: the last parameter is the primary timestamp column + // todo: refactor this if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { - pInput->pPTS = pInput->pData[j]; + pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data. +// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP); } ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { @@ -4291,6 +4293,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, } } } + int32_t len = (int32_t)(pStart - (char*)keyBuf); uint64_t groupId = calcGroupId(keyBuf, len); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t)); @@ -4309,6 +4312,30 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, return TDB_CODE_SUCCESS; } +static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { + memset(pCond, 0, sizeof(SQueryTableDataCond)); + + pCond->order = TSDB_ORDER_ASC; + pCond->numOfCols = 1; + pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + if (pCond->colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return terrno; + } + + pCond->colList->colId = 1; + pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP; + pCond->colList->bytes = sizeof(TSKEY); + + pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pCond->suid = uid; + pCond->type = BLOCK_LOAD_OFFSET_ORDER; + pCond->startVersion = -1; + pCond->endVersion = -1; + + return TSDB_CODE_SUCCESS; +} + SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, const char* pUser) { @@ -4318,7 +4345,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); + int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, + pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId); if (code) { pTaskInfo->code = code; return NULL; @@ -4337,7 +4365,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; - int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); + int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, + pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId); if (code) { pTaskInfo->code = code; return NULL; @@ -4366,7 +4395,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .maxTs = INT64_MIN, }; if (pHandle) { - int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); + int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, + pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId); if (code) { pTaskInfo->code = code; return NULL; @@ -4406,25 +4436,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SQueryTableDataCond cond = {0}; - - { - cond.order = TSDB_ORDER_ASC; - cond.numOfCols = 1; - cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); - if (cond.colList == NULL) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - cond.colList->colId = 1; - cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP; - cond.colList->bytes = sizeof(TSKEY); - - cond.twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; - cond.suid = pBlockNode->suid; - cond.type = BLOCK_LOAD_OFFSET_ORDER; - cond.startVersion = -1; - cond.endVersion = -1; + int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); + if (code != TSDB_CODE_SUCCESS) { + return NULL; } STsdbReader* pReader = NULL; @@ -4435,31 +4449,20 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; - // int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); - // if (code) { - // pTaskInfo->code = code; - // return NULL; - // } - - int32_t code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo); + int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, + queryId, taskId); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; } - pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); - if (pScanNode->scan.tableType == TSDB_SUPER_TABLE) { - code = vnodeGetAllTableList(pHandle->vnode, pScanNode->scan.uid, pTableListInfo->pTableList); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = terrno; - return NULL; - } - } else { // Create one table group. - STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->scan.uid, .groupId = 0}; - taosArrayPush(pTableListInfo->pTableList, &info); + code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; } - return createLastrowScanOperator(pScanNode, pHandle, pTableListInfo->pTableList, pTaskInfo); + return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo); } else { ASSERT(0); } @@ -4928,6 +4931,9 @@ static void doDestroyTableList(STableListInfo* pTableqinfoList) { if (pTableqinfoList->needSortTableByGroupId) { for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); + if (tmp == pTableqinfoList->pTableList) { + continue; + } taosArrayDestroy(tmp); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0f44ac48a4..6aacb0ee01 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2402,9 +2402,9 @@ typedef struct STableMergeScanInfo { SSampleExecInfo sample; // sample execution info } STableMergeScanInfo; -int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, +int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) { - int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2414,8 +2414,8 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle return TSDB_CODE_SUCCESS; } - pTableListInfo->needSortTableByGroupId = pTableScanNode->groupSort; - code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags); + pTableListInfo->needSortTableByGroupId = groupSort; + code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 747b1c8d9f..9ca4ee8d8f 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1981,6 +1981,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getLeastSQRFuncEnv, .initFunc = leastSQRFunctionSetup, .processFunc = leastSQRFunction, + .sprocessFunc = leastSQRScalarFunction, .finalizeFunc = leastSQRFinalize, .invertFunc = NULL, .combineFunc = leastSQRCombine, @@ -2228,7 +2229,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_cache_last_row", .type = FUNCTION_TYPE_CACHE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index c64b1d79ba..ba35356a9c 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -2139,3 +2139,171 @@ int32_t stddevScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara return TSDB_CODE_SUCCESS; } +#define LEASTSQR_CAL(p, x, y, index, step) \ + do { \ + (p)[0][0] += (double)(x) * (x); \ + (p)[0][1] += (double)(x); \ + (p)[0][2] += (double)(x) * (y)[index]; \ + (p)[1][2] += (y)[index]; \ + (x) += step; \ + } while (0) + +int32_t leastSQRScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + double startVal, stepVal; + double matrix[2][3] = {0}; + GET_TYPED_DATA(startVal, double, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData); + GET_TYPED_DATA(stepVal, double, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); + + int32_t type = GET_PARAM_TYPE(pInput); + int64_t count = 0; + + switch(type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t *in = (int8_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *in = (int16_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *in = (int32_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t *in = (int64_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t *in = (uint8_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t *in = (uint16_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t *in = (uint32_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_UBIGINT: { + uint64_t *in = (uint64_t *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float *in = (float *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double *in = (double *)pInputData->pData; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + continue; + } + + count++; + LEASTSQR_CAL(matrix, startVal, in, i, stepVal); + } + break; + } + } + + if (count == 0) { + colDataAppendNULL(pOutputData, 0); + } else { + matrix[1][1] = (double)count; + matrix[1][0] = matrix[0][1]; + + double matrix00 = matrix[0][0] - matrix[1][0] * (matrix[0][1] / matrix[1][1]); + double matrix02 = matrix[0][2] - matrix[1][2] * (matrix[0][1] / matrix[1][1]); + double matrix12 = matrix[1][2] - matrix02 * (matrix[1][0] / matrix00); + matrix02 /= matrix00; + + matrix12 /= matrix[1][1]; + + char buf[64] = {0}; + size_t len = + snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", matrix02, matrix12); + varDataSetLen(buf, len); + colDataAppend(pOutputData, 0, buf, false); + + } + + pOutput->numOfRows = 1; + return TSDB_CODE_SUCCESS; +} diff --git a/tests/system-test/2-query/csum.py b/tests/system-test/2-query/csum.py index 708aa35183..425597a919 100644 --- a/tests/system-test/2-query/csum.py +++ b/tests/system-test/2-query/csum.py @@ -425,8 +425,8 @@ class TDTestCase: tdSql.checkRows(70) tdSql.query("select csum(c1) from stb1 partition by tbname ") tdSql.checkRows(40) - # tdSql.query("select csum(st1) from stb1 partition by tbname") - # tdSql.checkRows(70) + tdSql.query("select csum(st1) from stb1 partition by tbname") + tdSql.checkRows(70) tdSql.query("select csum(st1+c1) from stb1 partition by tbname") tdSql.checkRows(40) tdSql.query("select csum(st1+c1) from stb1 partition by tbname") @@ -445,22 +445,22 @@ class TDTestCase: tdSql.checkRows(40) # bug need fix - # tdSql.query("select tbname , csum(c1) from stb1 partition by tbname") - # tdSql.checkRows(40) - # tdSql.query("select tbname , csum(st1) from stb1 partition by tbname") - # tdSql.checkRows(70) - # tdSql.query("select tbname , csum(st1) from stb1 partition by tbname slimit 1") - # tdSql.checkRows(7) + tdSql.query("select tbname , csum(c1) from stb1 partition by tbname") + tdSql.checkRows(40) + tdSql.query("select tbname , csum(st1) from stb1 partition by tbname") + tdSql.checkRows(70) + tdSql.query("select tbname , csum(st1) from stb1 partition by tbname slimit 1") + tdSql.checkRows(7) # partition by tags - # tdSql.query("select st1 , csum(c1) from stb1 partition by st1") - # tdSql.checkRows(40) - # tdSql.query("select csum(c1) from stb1 partition by st1") - # tdSql.checkRows(40) - # tdSql.query("select st1 , csum(c1) from stb1 partition by st1 slimit 1") - # tdSql.checkRows(4) - # tdSql.query("select csum(c1) from stb1 partition by st1 slimit 1") - # tdSql.checkRows(4) + tdSql.query("select st1 , csum(c1) from stb1 partition by st1") + tdSql.checkRows(40) + tdSql.query("select csum(c1) from stb1 partition by st1") + tdSql.checkRows(40) + tdSql.query("select st1 , csum(c1) from stb1 partition by st1 slimit 1") + tdSql.checkRows(4) + tdSql.query("select csum(c1) from stb1 partition by st1 slimit 1") + tdSql.checkRows(4) # partition by col # tdSql.query("select c1 , csum(c1) from stb1 partition by c1") diff --git a/tests/system-test/2-query/last_row.py b/tests/system-test/2-query/last_row.py new file mode 100644 index 0000000000..449f5c7806 --- /dev/null +++ b/tests/system-test/2-query/last_row.py @@ -0,0 +1,803 @@ +import taos +import sys +import datetime +import inspect + +from util.log import * +from util.sql import * +from util.cases import * +import random + + +class TDTestCase: + updatecfgDict = {'debugFlag': 143, "cDebugFlag": 143, "uDebugFlag": 143, "rpcDebugFlag": 143, "tmrDebugFlag": 143, + "jniDebugFlag": 143, "simDebugFlag": 143, "dDebugFlag": 143, "dDebugFlag": 143, "vDebugFlag": 143, "mDebugFlag": 143, "qDebugFlag": 143, + "wDebugFlag": 143, "sDebugFlag": 143, "tsdbDebugFlag": 143, "tqDebugFlag": 143, "fsDebugFlag": 143, "fnDebugFlag": 143 ,"udf":0} + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + self.tb_nums = 10 + self.row_nums = 20 + self.ts = 1434938400000 + self.time_step = 1000 + + def insert_datas_and_check_abs(self ,tbnums , rownums , time_step ): + tdLog.info(" prepare datas for auto check abs function ") + + tdSql.execute(" create database test cachelast 1 ") + tdSql.execute(" use test ") + tdSql.execute(" create stable stb (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint,\ + c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int)") + for tbnum in range(tbnums): + tbname = "sub_tb_%d"%tbnum + tdSql.execute(" create table %s using stb tags(%d) "%(tbname , tbnum)) + + ts = self.ts + for row in range(rownums): + ts = self.ts + time_step*row + c1 = random.randint(0,10000) + c2 = random.randint(0,100000) + c3 = random.randint(0,125) + c4 = random.randint(0,125) + c5 = random.random()/1.0 + c6 = random.random()/1.0 + c7 = "'true'" + c8 = "'binary_val'" + c9 = "'nchar_val'" + c10 = ts + tdSql.execute(f" insert into {tbname} values ({ts},{c1},{c2},{c3},{c4},{c5},{c6},{c7},{c8},{c9},{c10})") + + tdSql.execute("use test") + tbnames = ["stb", "sub_tb_1"] + support_types = ["BIGINT", "SMALLINT", "TINYINT", "FLOAT", "DOUBLE", "INT"] + for tbname in tbnames: + tdSql.query("desc {}".format(tbname)) + coltypes = tdSql.queryResult + for coltype in coltypes: + colname = coltype[0] + abs_sql = "select abs({}) from {} order by tbname ".format(colname, tbname) + origin_sql = "select {} from {} order by tbname".format(colname, tbname) + if coltype[1] in support_types: + self.check_result_auto(origin_sql , abs_sql) + + + def prepare_datas(self): + tdSql.execute("create database if not exists db keep 3650 duration 1000 cachelast 1") + tdSql.execute("use db") + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + for i in range(9): + tdSql.execute( + f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute( + f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute( + "insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )") + tdSql.execute( + "insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute( + "insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute( + "insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + + tdSql.execute( + "insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute( + "insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute( + "insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + + tdSql.execute( + f'''insert into t1 values + ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) + ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) + ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) + ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) + ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) + ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ''' + ) + + def prepare_tag_datas(self): + # prepare datas + tdSql.execute( + "create database if not exists testdb keep 3650 duration 1000 cachelast 1") + tdSql.execute(" use testdb ") + + tdSql.execute(f" create stable stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp , uc1 int unsigned,\ + uc2 bigint unsigned ,uc3 smallint unsigned , uc4 tinyint unsigned ) tags( t1 int , t2 bigint , t3 smallint , t4 tinyint , t5 float , t6 double , t7 bool , t8 binary(36)\ + , t9 nchar(36) , t10 int unsigned , t11 bigint unsigned ,t12 smallint unsigned , t13 tinyint unsigned ,t14 timestamp ) ") + + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute( + f'create table ct{i+1} using stb1 tags ( {1*i}, {11111*i}, {111*i}, {1*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" ,{111*i}, {1*i},{1*i},{1*i},now())') + + for i in range(9): + tdSql.execute( + f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a ,{111*i},{1111*i},{i},{i} )" + ) + tdSql.execute( + f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a ,{111*i},{1111*i},{i},{i})" + ) + tdSql.execute( + "insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a ,0,0,0,0)") + tdSql.execute( + "insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a , 999 , 9999 , 9 , 9)") + tdSql.execute( + "insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a , 999 , 99999 , 9 , 9)") + tdSql.execute( + "insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a ,999 , 99999 , 9 , 9)") + + tdSql.execute( + "insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL , NULL, NULL, NULL, NULL) ") + tdSql.execute( + "insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL , NULL, NULL, NULL, NULL) ") + tdSql.execute( + "insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL , NULL, NULL, NULL, NULL ) ") + + tdSql.execute( + f'''insert into t1 values + ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) + ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) + ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) + ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) + ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) + ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ''' + ) + + def check_result_auto(self, origin_query, abs_query): + abs_result = tdSql.getResult(abs_query) + origin_result = tdSql.getResult(origin_query) + + auto_result = [] + + for row in origin_result: + row_check = [] + for elem in row: + if elem == None: + elem = None + elif elem >= 0: + elem = elem + else: + elem = -elem + row_check.append(elem) + auto_result.append(row_check) + + check_status = True + for row_index, row in enumerate(abs_result): + for col_index, elem in enumerate(row): + if auto_result[row_index][col_index] != elem: + check_status = False + if not check_status: + tdLog.notice( + "abs function value has not as expected , sql is \"%s\" " % abs_query) + sys.exit(1) + else: + tdLog.info( + "abs value check pass , it work as expected ,sql is \"%s\" " % abs_query) + + def test_errors(self): + tdSql.execute("use testdb") + + # bug need fix + # tdSql.query("select last_row(c1 ,NULL) from t1") + + error_sql_lists = [ + "select last_row from t1", + "select last_row(-+--+c1) from t1", + # "select +-last_row(c1) from t1", + # "select ++-last_row(c1) from t1", + # "select ++--last_row(c1) from t1", + # "select - -last_row(c1)*0 from t1", + # "select last_row(tbname+1) from t1 ", + "select last_row(123--123)==1 from t1", + "select last_row(c1) as 'd1' from t1", + "select last_row(c1 ,NULL) from t1", + "select last_row(,) from t1;", + "select last_row(abs(c1) ab from t1)", + "select last_row(c1) as int from t1", + "select last_row from stb1", + # "select last_row(-+--+c1) from stb1", + # "select +-last_row(c1) from stb1", + # "select ++-last_row(c1) from stb1", + # "select ++--last_row(c1) from stb1", + # "select - -last_row(c1)*0 from stb1", + # "select last_row(tbname+1) from stb1 ", + "select last_row(123--123)==1 from stb1", + "select last_row(c1) as 'd1' from stb1", + # "select last_row(c1 ,c2 ) from stb1", + "select last_row(c1 ,NULL) from stb1", + "select last_row(,) from stb1;", + "select last_row(abs(c1) ab from stb1)", + "select last_row(c1) as int from stb1" + ] + for error_sql in error_sql_lists: + tdSql.error(error_sql) + + def support_types(self): + tdSql.execute("use testdb") + tbnames = ["stb1", "t1", "ct1", "ct2"] + + for tbname in tbnames: + tdSql.query("desc {}".format(tbname)) + coltypes = tdSql.queryResult + for coltype in coltypes: + colname = coltype[0] + col_note = coltype[-1] + if col_note != "TAG": + abs_sql = "select last_row({}) from {}".format(colname, tbname) + tdSql.query(abs_sql) + + + def basic_abs_function(self): + + # basic query + tdSql.query("select c1 from ct3") + tdSql.checkRows(0) + tdSql.query("select c1 from t1") + tdSql.checkRows(12) + tdSql.query("select c1 from stb1") + tdSql.checkRows(25) + + # used for empty table , ct3 is empty + tdSql.query("select last_row(c1) from ct3") + tdSql.checkRows(0) + tdSql.query("select last_row(c2) from ct3") + tdSql.checkRows(0) + tdSql.query("select last_row(c3) from ct3") + tdSql.checkRows(0) + tdSql.query("select last_row(c4) from ct3") + tdSql.checkRows(0) + tdSql.query("select last_row(c5) from ct3") + tdSql.checkRows(0) + tdSql.query("select last_row(c6) from ct3") + + # used for regular table + + # bug need fix + tdSql.query("select last_row(c1) from t1") + tdSql.checkData(0, 0, None) + tdSql.query("select last_row(c1) from ct4") + tdSql.checkData(0, 0, None) + tdSql.query("select last_row(c1) from stb1") + tdSql.checkData(0, 0, None) + + # # bug need fix + tdSql.query("select last_row(c1), c2, c3 , c4, c5 from t1") + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + + # # bug need fix + tdSql.query("select last_row(c1), c2, c3 , c4, c5 from ct1") + tdSql.checkData(0, 0, 9) + tdSql.checkData(0, 1, -99999) + tdSql.checkData(0, 2, -999) + tdSql.checkData(0, 3, None) + tdSql.checkData(0, 4,-9.99000) + + # bug need fix + # tdSql.query("select last_row(c1), c2, c3 , c4, c5 from stb1 where tbname='ct1'") + # tdSql.checkData(0, 0, 9) + # tdSql.checkData(0, 1, -99999) + # tdSql.checkData(0, 2, -999) + # tdSql.checkData(0, 3, None) + # tdSql.checkData(0, 4,-9.99000) + + # bug fix + tdSql.query("select last_row(abs(c1)) from ct1") + tdSql.checkData(0,0,9) + + # # bug fix + tdSql.query("select last_row(c1+1) from ct1") + tdSql.query("select last_row(c1+1) from stb1") + tdSql.query("select last_row(c1+1) from t1") + + # used for stable table + tdSql.query("select last_row(c1 ,c2 ,c3) ,last_row(c4) from ct1") + tdSql.checkData(0,0,9) + tdSql.checkData(0,1,-99999) + tdSql.checkData(0,2,-999) + tdSql.checkData(0,3,None) + + # bug need fix + tdSql.query("select last_row(c1 ,c2 ,c3) from stb1 ") + tdSql.checkData(0,0,None) + tdSql.checkData(0,1,None) + tdSql.checkData(0,2,None) + + # tdSql.query("select last_row(c1 ,c2 ,c3) ,last_row(c4) from stb1 where ts 5 ") + tdSql.checkData(0, 0, 6) + tdSql.checkData(0, 1, 6.000000000) + tdSql.checkData(0, 2, 6.000000000) + tdSql.checkData(0, 3, 5.900000000) + tdSql.checkData(0, 4, 2.084962501) + + tdSql.query( + "select last_row(c1,c2,c1+5) from ct4 where c1=5 ") + tdSql.checkData(0, 0, 5) + tdSql.checkData(0, 1, 55555) + tdSql.checkData(0, 2, 10.000000000) + + tdSql.query( + "select last(c1,c2,c1+5) from ct4 where c1=5 ") + tdSql.checkData(0, 0, 5) + tdSql.checkData(0, 1, 55555) + tdSql.checkData(0, 2, 10.000000000) + + tdSql.query( + "select c1,c2 , abs(c1) -0 ,ceil(c1-0.1)-0 ,floor(c1+0.1)-0.1 ,ceil(log(c1,2)-0.5) from ct4 where c1>log(c1,2) limit 1 ") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 8) + tdSql.checkData(0, 1, 88888) + tdSql.checkData(0, 2, 8.000000000) + tdSql.checkData(0, 3, 8.000000000) + tdSql.checkData(0, 4, 7.900000000) + tdSql.checkData(0, 5, 3.000000000) + + def abs_Arithmetic(self): + pass + + def check_boundary_values(self): + + tdSql.execute("drop database if exists bound_test") + tdSql.execute("create database if not exists bound_test cachelast 2") + time.sleep(3) + tdSql.execute("use bound_test") + tdSql.execute( + "create table stb_bound (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(32),c9 nchar(32), c10 timestamp) tags (t1 int);" + ) + tdSql.execute(f'create table sub1_bound using stb_bound tags ( 1 )') + tdSql.execute( + f"insert into sub1_bound values ( now()-1s, 2147483647, 9223372036854775807, 32767, 127, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + tdSql.execute( + f"insert into sub1_bound values ( now()-1s, -2147483647, -9223372036854775807, -32767, -127, -3.40E+38, -1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + tdSql.execute( + f"insert into sub1_bound values ( now(), 2147483646, 9223372036854775806, 32766, 126, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + tdSql.execute( + f"insert into sub1_bound values ( now(), -2147483646, -9223372036854775806, -32766, -126, -3.40E+38, -1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + tdSql.error( + f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )" + ) + + # check basic elem for table per row + tdSql.query( + "select last(c1) ,last_row(c2), last_row(c3)+1 , last(c4)+1 from sub1_bound ") + tdSql.checkData(0, 0, -2147483646) + tdSql.checkData(0, 1, -9223372036854775806) + tdSql.checkData(0, 2, -32765.000000000) + tdSql.checkData(0, 3, -125.000000000) + # check + - * / in functions + tdSql.query( + "select last_row(c1+1) ,last_row(c2) , last(c3*1) , last(c4/2) from sub1_bound ") + + def test_tag_compute_for_scalar_function(self): + + tdSql.execute("use testdb") + + # bug need fix + + tdSql.query(" select sum(c1) from stb1 where t1+10 >1; ") + tdSql.query("select c1 ,t1 from stb1 where t1 =0 ") + tdSql.checkRows(13) + tdSql.query("select last_row(c1,t1) from stb1 ") + tdSql.checkData(0,0,None) + tdSql.checkData(0,1,3) + # tdSql.query("select last_row(c1),t1 from stb1 ") + # tdSql.checkData(0,0,None) + # tdSql.checkData(0,1,3) + tdSql.query("select last_row(c1,t1),last(t1) from stb1 ") + tdSql.checkData(0,0,None) + tdSql.checkData(0,1,3) + tdSql.checkData(0,2,3) + + tdSql.query("select last_row(t1) from stb1 where t1 >0 ") + tdSql.checkRows(1) + tdSql.checkData(0,0,3) + tdSql.query("select last_row(t1) from stb1 where t1 =3 ") + tdSql.checkRows(1) + tdSql.checkData(0,0,3) + + tdSql.query("select last_row(t1) from stb1 where t1 =2") + tdSql.checkRows(0) + + # nest query for last_row + # tdSql.query("select last_row(t1) from (select c1 ,t1 from stb1)") + # tdSql.checkData(0,0,61) + # tdSql.query("select distinct(c1) ,t1 from stb1") + # tdSql.checkRows(20) + tdSql.query("select last_row(c1) from (select _rowts , c1 ,t1 from stb1)") + tdSql.checkData(0,0,None) + + tdSql.query("select last_row(c1) from (select ts , c1 ,t1 from stb1)") + tdSql.checkData(0,0,None) + + tdSql.query("select ts , last_row(c1) ,c1 from (select ts , c1 ,t1 from stb1)") + tdSql.checkData(0,1,None,None) + + tdSql.query("select ts , last_row(c1) ,c1 from (select ts , max(c1) c1 ,t1 from stb1 where ts >now -1h and ts now -1h and ts now -1h and ts ="2022-07-06 16:00:00.000 " and ts < "2022-07-06 17:00:00.000 " interval(50s) sliding(30s) fill(NULL)') + # tdSql.checkRows(40) + # tdSql.checkData(0,0,None) + tdSql.query('select max(c1) from stb1 where ts>="2022-07-06 16:00:00.000 " and ts < "2022-07-06 17:00:00.000 " interval(50s) sliding(30s)') + tdSql.checkRows(5) + + + + def support_super_table_test(self): + tdSql.execute(" use testdb ") + self.check_result_auto( " select c1 from stb1 order by ts " , "select abs(c1) from stb1 order by ts" ) + self.check_result_auto( " select c1 from stb1 order by tbname " , "select abs(c1) from stb1 order by tbname" ) + self.check_result_auto( " select c1 from stb1 where c1 > 0 order by tbname " , "select abs(c1) from stb1 where c1 > 0 order by tbname" ) + self.check_result_auto( " select c1 from stb1 where c1 > 0 order by tbname " , "select abs(c1) from stb1 where c1 > 0 order by tbname" ) + + self.check_result_auto( " select t1,c1 from stb1 order by ts " , "select t1, abs(c1) from stb1 order by ts" ) + self.check_result_auto( " select t2,c1 from stb1 order by tbname " , "select t2 ,abs(c1) from stb1 order by tbname" ) + self.check_result_auto( " select t3,c1 from stb1 where c1 > 0 order by tbname " , "select t3 ,abs(c1) from stb1 where c1 > 0 order by tbname" ) + self.check_result_auto( " select t4,c1 from stb1 where c1 > 0 order by tbname " , "select t4 , abs(c1) from stb1 where c1 > 0 order by tbname" ) + pass + + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + # tdSql.prepare() + + tdLog.printNoPrefix("==========step1:create table ==============") + + self.prepare_datas() + self.prepare_tag_datas() + + tdLog.printNoPrefix("==========step2:test errors ==============") + + self.test_errors() + + tdLog.printNoPrefix("==========step3:support types ============") + + self.support_types() + + tdLog.printNoPrefix("==========step4: abs basic query ============") + + self.basic_abs_function() + + tdLog.printNoPrefix("==========step5: abs boundary query ============") + + self.check_boundary_values() + + tdLog.printNoPrefix("==========step6: abs filter query ============") + + self.abs_func_filter() + + tdLog.printNoPrefix("==========step6: tag coumpute query ============") + + self.test_tag_compute_for_scalar_function() + + tdLog.printNoPrefix("==========step7: check result of query ============") + + self.insert_datas_and_check_abs(self.tb_nums,self.row_nums,self.time_step) + + tdLog.printNoPrefix("==========step8: check abs result of stable query ============") + + self.support_super_table_test() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/max_partition.py b/tests/system-test/2-query/max_partition.py index 90b8d25cb1..cf0d6639c2 100644 --- a/tests/system-test/2-query/max_partition.py +++ b/tests/system-test/2-query/max_partition.py @@ -45,8 +45,8 @@ class TDTestCase: tdSql.query(" select max(c1) from stb group by tbname order by tbname ") tdSql.checkRows(self.tb_nums) # bug need fix - # tdSql.query(" select max(t1) from stb group by t2 order by t2 ") - # tdSql.checkRows(self.tb_nums) + tdSql.query(" select max(t2) from stb group by t2 order by t2 ") + tdSql.checkRows(self.tb_nums) tdSql.query(" select max(c1) from stb group by c1 order by c1 ") tdSql.checkRows(self.row_nums+1) @@ -90,8 +90,8 @@ class TDTestCase: tdSql.query("select tbname , max(t2) from stb partition by t2 order by t2") # # bug need fix - # tdSql.query("select t2 , max(t2) from stb partition by t2 order by t2") - # tdSql.checkRows(self.tb_nums) + tdSql.query("select t2 , max(t2) from stb partition by t2 order by t2") + tdSql.checkRows(self.tb_nums) tdSql.query("select tbname , max(c1) from stb partition by tbname order by tbname") tdSql.checkRows(self.tb_nums) @@ -126,8 +126,8 @@ class TDTestCase: tdSql.checkData(0,0,self.row_nums) # bug need fix - # tdSql.query("select count(c1) , max(t1) ,abs(c1) from stb partition by abs(c1) order by abs(c1)") - # tdSql.checkRows(self.row_nums+1) + tdSql.query("select count(c1) , max(t2) ,abs(c1) from stb partition by abs(c1) order by abs(c1)") + tdSql.checkRows(self.row_nums+1) tdSql.query("select max(ceil(c2)) , max(floor(t2)) ,max(floor(c2)) from stb partition by abs(c2) order by abs(c2)") @@ -136,6 +136,18 @@ class TDTestCase: tdSql.query("select max(ceil(c1-2)) , max(floor(t2+1)) ,max(c2-c1) from stb partition by abs(floor(c1)) order by abs(floor(c1))") tdSql.checkRows(self.row_nums+1) + + tdSql.query("select tbname , max(c1) ,c1 from stb partition by tbname order by tbname") + tdSql.checkRows(self.tb_nums) + tdSql.checkData(0,0,'sub_stb_0') + tdSql.checkData(0,1,9) + tdSql.checkData(0,2,9) + + tdSql.query("select tbname ,top(c1,1) ,c1 from stb partition by tbname order by tbname") + tdSql.checkRows(self.tb_nums) + + tdSql.query(" select c1 , sample(c1,2) from stb partition by tbname order by tbname ") + tdSql.checkRows(self.tb_nums*2) # interval diff --git a/tests/system-test/2-query/sample.py b/tests/system-test/2-query/sample.py index f583b7dd78..84ff5a0056 100644 --- a/tests/system-test/2-query/sample.py +++ b/tests/system-test/2-query/sample.py @@ -31,6 +31,7 @@ class TDTestCase: def init(self, conn, logSql): tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor()) + self.ts = 1537146000000 def sample_query_form(self, sel="select", func="sample(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr="t1", condition=""): ''' @@ -624,10 +625,11 @@ class TDTestCase: tdLog.info(" sample data is in datas groups ,successed sql is : %s" % sample_query ) else: tdLog.exit(" sample data is not in datas groups ,failed sql is : %s" % sample_query ) - + + def basic_sample_query(self): tdSql.execute(" drop database if exists db ") - tdSql.execute(" create database if not exists db duration 300 ") + tdSql.execute(" create database if not exists db duration 300d ") tdSql.execute(" use db ") tdSql.execute( '''create table stb1 @@ -759,14 +761,6 @@ class TDTestCase: self.check_sample("select sample( c1 ,3 ) from t1 where c1 between 1 and 10" ,"select c1 from t1 where c1 between 1 and 10") - tdSql.query("select sample(c1,2) ,c2,c3 ,c5 from stb1") - tdSql.checkRows(2) - tdSql.checkCols(4) - - self.check_sample("select sample( c1 ,3 ),c2,c3,c4,c5 from t1 where c1 between 1 and 10" ,"select c1,c2,c3,c4,c5 from t1 where c1 between 1 and 10") - self.check_sample("select sample( c1 ,3 ),c2,c3,c4,c5 from stb1 where c1 between 1 and 10" ,"select c1,c2,c3,c4,c5 from stb1 where c1 between 1 and 10") - self.check_sample("select sample( c1 ,3 ),t1 from stb1 where c1 between 1 and 10" ,"select c1,t1 from stb1 where c1 between 1 and 10") - # join tdSql.query("select sample( ct4.c1 , 1 ) from ct1, ct4 where ct4.ts=ct1.ts") @@ -779,8 +773,8 @@ class TDTestCase: self.check_sample("select sample(c1,2) from stb1 partition by tbname" , "select c1 from stb1 partition by tbname") # nest query - tdSql.query("select sample(c1,2) from (select c1 from t1); ") - tdSql.checkRows(2) + # tdSql.query("select sample(c1,2) from (select c1 from t1); ") + # tdSql.checkRows(2) # union all tdSql.query("select sample(c1,2) from t1 union all select sample(c1,3) from t1") @@ -798,36 +792,6 @@ class TDTestCase: tdSql.query("select sample(c1,100)+2 from ct1") tdSql.query("select abs(sample(c1,100)) from ct1") - # support stable and tbname - tdSql.query("select tbname ,sample(c1,2) from stb1 partition by tbname order by tbname") - tdSql.checkRows(4) - tdSql.checkData(0,0,'ct1') - tdSql.checkData(3,0,'ct4') - - # # bug need fix - # tdSql.query(" select tbname ,c1 ,t1, sample(c1,2) from stb1 partition by tbname order by tbname ") - # tdSql.checkRows(4) - # tdSql.checkData(0,0,'ct1') - # tdSql.checkData(3,0,'ct4') - # tdSql.checkData(0,2,1) - # tdSql.checkData(3,2,4) - - tdSql.query(" select tbname ,c1 ,t1, sample(c1,2) from stb1 partition by t1 order by t1 ") - tdSql.checkRows(4) - tdSql.checkData(0,0,'ct1') - tdSql.checkData(3,0,'ct4') - tdSql.checkData(0,2,1) - tdSql.checkData(3,2,4) - - # bug need fix - # tdSql.query(" select tbname ,c1 ,t1, sample(c1,2) from stb1 partition by c1 order by c1 ") - # tdSql.checkRows(21) - - # bug need fix - # tdSql.query(" select sample(c1,2) from stb1 partition by c1 ") - # tdSql.checkRows(21) - - def sample_test_run(self) : tdLog.printNoPrefix("==========support sample function==========") tbnum = 10 @@ -891,11 +855,29 @@ class TDTestCase: self.basic_sample_query() + def sample_big_data(self): + tdSql.execute("create database sample_db") + tdSql.execute("use sample_db") + tdSql.execute("create stable st (ts timestamp ,c1 int ) tags(ind int)" ) + tdSql.execute("create table sub_tb using st tags(1)") + + for i in range(2000): + ts = self.ts+i*10 + tdSql.execute(f"insert into sub_tb values({ts} ,{i})") + + tdSql.query("select count(*) from st") + tdSql.checkData(0,0,2000) + tdSql.query("select sample(c1 ,1000) from st") + tdSql.checkRows(1000) + + + def run(self): import traceback try: # run in develop branch self.sample_test_run() + self.sample_big_data() pass except Exception as e: traceback.print_exc()