Merge pull request #26008 from taosdata/fix/TS-4833/dataloaded

partition interval and limimt, dataload error
This commit is contained in:
dapan1121 2024-06-17 08:32:19 +08:00 committed by GitHub
commit e34a8b48df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 318 additions and 97 deletions

View File

@ -240,7 +240,7 @@ typedef struct SDataBlockInfo {
} SDataBlockInfo;
typedef struct SSDataBlock {
SColumnDataAgg** pBlockAgg;
SColumnDataAgg* pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData>
SDataBlockInfo info;
} SSDataBlock;

View File

@ -102,7 +102,7 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
return false;
}
if (pColAgg != NULL) {
if (pColAgg != NULL && pColAgg->colId != -1) {
if (pColAgg->numOfNull == totalRows) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return true;
@ -282,6 +282,8 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* p
void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);
void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc);
#ifdef __cplusplus
}
#endif

View File

@ -58,7 +58,7 @@ extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict
extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pColsAgg, int32_t numOfCols, int32_t numOfRows);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pColsAgg, int32_t numOfCols, int32_t numOfRows);
/* condition split interface */
int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond,

View File

@ -21,7 +21,7 @@
#define MALLOC_ALIGN_BYTES 32
static void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
@ -848,7 +848,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
if (pBlock->pBlockAgg == NULL) {
isNull = colDataIsNull_s(pColData, j);
} else {
isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
isNull = colDataIsNull(pColData, pBlock->info.rows, j, &pBlock->pBlockAgg[i]);
}
if (isNull) {

View File

@ -4897,7 +4897,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
}
int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) {
SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;
SColumnDataAgg** pBlockSMA = &pDataBlock->pBlockAgg;
int32_t code = 0;
*allHave = false;
@ -4952,7 +4952,13 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
if (pResBlock->pBlockAgg == NULL) {
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
if (pResBlock->pBlockAgg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int i = 0; i < num; ++i) {
pResBlock->pBlockAgg[i].colId = -1;
}
}
// do fill all null column value SMA info
@ -4964,13 +4970,12 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = &pSup->colAggArray.data[i];
if (pAgg->colId == pSup->colId[j]) {
pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
pResBlock->pBlockAgg[pSup->slotId[j]] = *pAgg;
i += 1;
j += 1;
} else if (pAgg->colId < pSup->colId[j]) {
i += 1;
} else if (pSup->colId[j] < pAgg->colId) {
pResBlock->pBlockAgg[pSup->slotId[j]] = NULL;
*allHave = false;
j += 1;
}

View File

@ -624,6 +624,8 @@ typedef struct SDataGroupInfo {
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
SArray* blockForNotLoaded; // SSDataBlock that data is not loaded
int32_t offsetForNotLoaded; // read offset for SSDataBlock that data is not loaded
} SDataGroupInfo;
typedef struct SWindowRowsSup {

View File

@ -2368,7 +2368,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol
for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
if (isNull[i] != 1) return 1;
@ -2403,7 +2403,7 @@ int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock*
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pCol->slotId > pBlock->pDataBlock->size) continue;
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
if (pBlock->pBlockAgg) pColAgg = &pBlock->pBlockAgg[pCol->slotId];
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
isNull[i] = 1;

View File

@ -434,8 +434,8 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId;
pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
if (pInput->pColumnDataAgg[j] == NULL) {
pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
if (pInput->pColumnDataAgg[j]->colId == -1) {
pInput->colDataSMAIsSet = false;
}

View File

@ -131,7 +131,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
SColumn* pCol = taosArrayGet(pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
@ -187,7 +187,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
}
if (pBlock->pBlockAgg != NULL) {
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
@ -563,6 +563,55 @@ _error:
return NULL;
}
SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBlock* pDataBlock) {
if (pDataBlock == NULL) {
return NULL;
}
SSDataBlock* pDstBlock = createDataBlock();
pDstBlock->info = pDataBlock->info;
pDstBlock->info.id.blockId = pOperator->resultDataBlockId;
pDstBlock->info.capacity = 0;
pDstBlock->info.rowSize = 0;
size_t numOfCols = pOperator->exprSupp.numOfExprs;
if (pDataBlock->pBlockAgg) {
pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
if (pDstBlock->pBlockAgg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
blockDataDestroy(pDstBlock);
return NULL;
}
for(int i = 0; i < numOfCols; ++i) {
pDstBlock->pBlockAgg[i].colId = -1;
}
}
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId);
SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info};
blockDataAppendColInfo(pDstBlock, &colInfo);
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
if (pDataBlock->pBlockAgg && pDataBlock->pBlockAgg[slotId].colId != -1) {
pDstBlock->pBlockAgg[i] = pDataBlock->pBlockAgg[slotId];
} else {
int32_t code = doEnsureCapacity(pDst, &pDstBlock->info, pDataBlock->info.rows, false);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock);
return NULL;
}
colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
}
}
return pDstBlock;
}
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -584,71 +633,86 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
}
// number of rows
int32_t* rows = (int32_t*)pPage;
if (pBlock->info.dataLoad) {
// number of rows
int32_t* rows = (int32_t*)pPage;
size_t numOfCols = pOperator->exprSupp.numOfExprs;
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
size_t numOfCols = pOperator->exprSupp.numOfExprs;
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t bytes = pColInfoData->info.bytes;
int32_t startOffset = pInfo->columnOffset[i];
int32_t bytes = pColInfoData->info.bytes;
int32_t startOffset = pInfo->columnOffset[i];
int32_t* columnLen = NULL;
int32_t contentLen = 0;
int32_t* columnLen = NULL;
int32_t contentLen = 0;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
int32_t* offset = (int32_t*)((char*)pPage + startOffset);
columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
char* data = (char*)((char*)columnLen + sizeof(int32_t));
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
int32_t* offset = (int32_t*)((char*)pPage + startOffset);
columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
char* data = (char*)((char*)columnLen + sizeof(int32_t));
if (colDataIsNull_s(pColInfoData, j)) {
offset[(*rows)] = -1;
contentLen = 0;
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j);
int32_t dataLen = getJsonValueLen(src);
if (colDataIsNull_s(pColInfoData, j)) {
offset[(*rows)] = -1;
contentLen = 0;
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j);
int32_t dataLen = getJsonValueLen(src);
memcpy(data + (*columnLen), src, dataLen);
int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
ASSERT(v > 0);
memcpy(data + (*columnLen), src, dataLen);
int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
ASSERT(v > 0);
contentLen = dataLen;
contentLen = dataLen;
} else {
offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j);
memcpy(data + (*columnLen), src, varDataTLen(src));
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
ASSERT(v > 0);
contentLen = varDataTLen(src);
}
} else {
offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j);
memcpy(data + (*columnLen), src, varDataTLen(src));
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
ASSERT(v > 0);
char* bitmap = (char*)pPage + startOffset;
columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
char* data = (char*)columnLen + sizeof(int32_t);
contentLen = varDataTLen(src);
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
if (isNull) {
colDataSetNull_f(bitmap, (*rows));
} else {
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
}
contentLen = bytes;
}
} else {
char* bitmap = (char*)pPage + startOffset;
columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
char* data = (char*)columnLen + sizeof(int32_t);
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
if (isNull) {
colDataSetNull_f(bitmap, (*rows));
} else {
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
}
contentLen = bytes;
(*columnLen) += contentLen;
}
(*columnLen) += contentLen;
(*rows) += 1;
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage);
} else {
SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pOperator, pBlock);
if (dataNotLoadBlock == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
if (pGroupInfo->blockForNotLoaded == NULL) {
pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*));
pGroupInfo->offsetForNotLoaded = 0;
}
dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
dataNotLoadBlock->info.dataLoad = 0;
taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock);
break;
}
(*rows) += 1;
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage);
}
}
@ -730,6 +794,14 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
for (int32_t i = 0; i < size; i++) {
SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
if (pGp->blockForNotLoaded) {
for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) {
SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i);
blockDataDestroy(*pBlock);
}
taosArrayClear(pGp->blockForNotLoaded);
pGp->offsetForNotLoaded = 0;
}
taosArrayDestroy(pGp->pPageList);
}
taosArrayClear(pInfo->sortedGroupArray);
@ -747,6 +819,15 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
}
static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) {
if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) {
SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded);
pGroupInfo->offsetForNotLoaded++;
return *pBlock;
}
return NULL;
}
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -756,13 +837,17 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SDataGroupInfo* pGroupInfo =
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
if(pGroupInfo != NULL) {
SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
if(ret != NULL) return ret;
}
// try next group data
++pInfo->groupIndex;
if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) {
setOperatorCompleted(pOperator);
clearPartitionOperator(pInfo);
return NULL;
}
++pInfo->groupIndex;
pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
pInfo->pageIndex = 0;
@ -774,6 +859,20 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
if (*(int32_t*)page == 0) {
releaseBufPage(pInfo->pBuf, page);
SSDataBlock* ret = buildPartitionResultForNotLoadBlock(pGroupInfo);
if (ret != NULL) return ret;
if (pInfo->groupIndex + 1 < taosArrayGetSize(pInfo->sortedGroupArray)) {
pInfo->groupIndex++;
pInfo->pageIndex = 0;
} else {
setOperatorCompleted(pOperator);
clearPartitionOperator(pInfo);
return NULL;
}
return buildPartitionResult(pOperator);
}
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
@ -783,6 +882,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
pInfo->binfo.pRes->info.dataLoad = 1;
pInfo->orderedRows = 0;
} else if (pInfo->pOrderInfoArr == NULL) {
qError("Exception, remainRows not zero, but pOrderInfoArr is NULL");
}
if (pInfo->pOrderInfoArr) {

View File

@ -220,7 +220,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
return code;
}
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg* pColsAgg, int32_t numOfCols,
int32_t numOfRows) {
if (pColsAgg == NULL || pFilterInfo == NULL) {
return true;
@ -769,6 +769,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
bool hasNext = false;
int32_t code = TSDB_CODE_SUCCESS;
pBlock->info.dataLoad = false;
int64_t st = taosGetTimestampUs();

View File

@ -911,7 +911,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
struct SColumnDataAgg* pAgg = NULL;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
pAgg = (pBlock->pBlockAgg != NULL) ? &pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
continue;
}

View File

@ -651,7 +651,7 @@ int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
leftNull = colDataIsNull_t(pLeftColInfoData, leftRowIndex, isVarType);
} else {
leftNull =
colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]);
colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, &pLeftBlock->pBlockAgg[pOrder->slotId]);
}
}
@ -661,7 +661,7 @@ int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
rightNull = colDataIsNull_t(pRightColInfoData, rightRowIndex, isVarType);
} else {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, rightRowIndex,
pRightBlock->pBlockAgg[pOrder->slotId]);
&pRightBlock->pBlockAgg[pOrder->slotId]);
}
}
@ -742,7 +742,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType);
} else {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
pLeftBlock->pBlockAgg[i]);
&pLeftBlock->pBlockAgg[i]);
}
}
@ -752,7 +752,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType);
} else {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
pRightBlock->pBlockAgg[i]);
&pRightBlock->pBlockAgg[i]);
}
}

View File

@ -3631,7 +3631,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex);
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);
}
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);

View File

@ -702,23 +702,16 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct
}
}
static int32_t saveRelatedTuple(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, int32_t index, void* tval) {
static int32_t saveRelatedTupleTag(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, void* tval) {
SColumnInfoData* pCol = pInput->pData[0];
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo);
int32_t code = 0;
int32_t code = TSDB_CODE_SUCCESS;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
code = saveTupleData(pCtx, 0, pCtx->pSrcBlock, &pBuf->tuplePos);
}
return code;
}
@ -758,7 +751,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
pBuf->v = GET_INT64_VAL(tval);
}
code = saveRelatedTuple(pCtx, pInput, index, tval);
code = saveRelatedTupleTag(pCtx, pInput, tval);
} else {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
int64_t prev = 0;
@ -767,7 +760,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
GET_INT64_VAL(&pBuf->v) = val;
code = saveRelatedTuple(pCtx, pInput, index, tval);
code = saveRelatedTupleTag(pCtx, pInput, tval);
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t prev = 0;
@ -776,7 +769,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
GET_UINT64_VAL(&pBuf->v) = val;
code = saveRelatedTuple(pCtx, pInput, index, tval);
code = saveRelatedTupleTag(pCtx, pInput, tval);
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double prev = 0;
@ -785,7 +778,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
GET_DOUBLE_VAL(&pBuf->v) = val;
code = saveRelatedTuple(pCtx, pInput, index, tval);
code = saveRelatedTupleTag(pCtx, pInput, tval);
}
} else if (type == TSDB_DATA_TYPE_FLOAT) {
float prev = 0;
@ -794,7 +787,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
float val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
GET_FLOAT_VAL(&pBuf->v) = val;
code = saveRelatedTuple(pCtx, pInput, index, tval);
code = saveRelatedTupleTag(pCtx, pInput, tval);
}
}
}

View File

@ -3782,7 +3782,7 @@ int32_t fltSclBuildRangeFromBlockSma(SFltSclColumnRange *colRange, SColumnDataAg
return TSDB_CODE_SUCCESS;
}
bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t numOfCols, int32_t numOfRows) {
bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) {
if (info->scalarMode) {
SArray *colRanges = info->sclCtx.fltSclRange;
for (int32_t i = 0; i < taosArrayGetSize(colRanges); ++i) {
@ -3790,13 +3790,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t
bool foundCol = false;
int32_t j = 0;
for (; j < numOfCols; ++j) {
if (pDataStatis[j] != NULL && pDataStatis[j]->colId == colRange->colNode->colId) {
if (pDataStatis[j].colId == colRange->colNode->colId) {
foundCol = true;
break;
}
}
if (foundCol) {
SColumnDataAgg *pAgg = pDataStatis[j];
SColumnDataAgg *pAgg = &pDataStatis[j];
SArray *points = taosArrayInit(2, sizeof(SFltSclPoint));
fltSclBuildRangeFromBlockSma(colRange, pAgg, numOfRows, points);
qDebug("column data agg: nulls %d, rows %d, max %" PRId64 " min %" PRId64, pAgg->numOfNull, numOfRows,
@ -3833,7 +3833,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t
int32_t index = -1;
SFilterRangeCtx *ctx = info->colRange[k];
for (int32_t i = 0; i < numOfCols; ++i) {
if (pDataStatis[i] != NULL && pDataStatis[i]->colId == ctx->colId) {
if (pDataStatis[i].colId == ctx->colId) {
index = i;
break;
}
@ -3849,13 +3849,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t
break;
}
if (pDataStatis[index]->numOfNull <= 0) {
if (pDataStatis[index].numOfNull <= 0) {
if (ctx->isnull && !ctx->notnull && !ctx->isrange) {
ret = false;
break;
}
} else if (pDataStatis[index]->numOfNull > 0) {
if (pDataStatis[index]->numOfNull == numOfRows) {
} else if (pDataStatis[index].numOfNull > 0) {
if (pDataStatis[index].numOfNull == numOfRows) {
if ((ctx->notnull || ctx->isrange) && (!ctx->isnull)) {
ret = false;
break;
@ -3869,7 +3869,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t
}
}
SColumnDataAgg *pDataBlockst = pDataStatis[index];
SColumnDataAgg *pDataBlockst = &pDataStatis[index];
SFilterRangeNode *r = ctx->rs;
float minv = 0;

View File

@ -657,8 +657,10 @@ if __name__ == "__main__":
conn = taos.connect(host=f"{host}", config=tdDnodes.getSimCfgPath())
if fileName == "all":
tdLog.info("Procedures for testing runAllLinux")
tdCases.runAllLinux(conn)
else:
tdLog.info(f"Procedures for testing runOneLinux {fileName}")
tdCases.runOneLinux(conn, fileName, replicaVar)
# do restart option

View File

@ -536,6 +536,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max.py
@ -780,6 +782,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 2
@ -875,6 +878,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3
@ -972,6 +976,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_limit_interval.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row_interval.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 4

View File

@ -0,0 +1,105 @@
from util.log import *
from util.sql import *
from util.cases import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True)
self.row_nums = 1000
self.tb_nums = 10
self.ts = 1537146000000
self.dbname = "db1"
self.stable = "meters"
def prepare_datas(self, stb_name , tb_nums , row_nums, dbname="db" ):
tdSql.execute(f'''create database {self.dbname} MAXROWS 4096 MINROWS 100''')
tdSql.execute(f'''use {self.dbname}''')
tdSql.execute(f'''CREATE STABLE {self.dbname}.{self.stable} (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` TINYINT, `location` VARCHAR(16))''')
for i in range(self.tb_nums):
tbname = f"{self.dbname}.sub_{self.stable}_{i}"
ts = self.ts + i*10000
tdSql.execute(f"create table {tbname} using {self.dbname}.{self.stable} tags({i} ,'nchar_{i}')")
tdLog.info(f"create table {tbname} using {self.dbname}.{self.stable} tags({i} ,'nchar_{i}')")
if i < (self.tb_nums - 2):
for row in range(row_nums):
ts = self.ts + row*1000
tdSql.execute(f"insert into {tbname} values({ts} , {row/10}, {215 + (row % 100)})")
for null in range(5):
ts = self.ts + row_nums*1000 + null*1000
tdSql.execute(f"insert into {tbname} values({ts} , NULL , NULL)")
def basic_query(self):
tdSql.query(f"select groupid, count(*) from {self.dbname}.{self.stable} partition by groupid interval(1d) limit 100")
tdSql.checkRows(8)
tdSql.checkData(0, 1, 1005)
tdSql.query(f"select groupid, count(*) from {self.dbname}.{self.stable} partition by tbname interval(1d) order by groupid limit 100;")
tdSql.checkRows(8)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, 1005)
tdSql.checkData(7, 0, 7)
tdSql.checkData(7, 1, 1005)
tdSql.query(f"select groupid, count(*) from {self.dbname}.{self.stable} partition by tbname, groupid interval(5d) order by groupid limit 10")
tdSql.checkRows(8)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, 1005)
tdSql.checkData(7, 0, 7)
tdSql.checkData(7, 1, 1005)
tdSql.query(f"select groupid, count(*), min(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) order by groupid limit 10;")
tdSql.checkRows(8)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, 1005)
tdSql.checkData(0, 2, 0)
tdSql.checkData(7, 0, 7)
tdSql.checkData(7, 1, 1005)
tdSql.checkData(7, 2, 0)
tdSql.query(f"select groupid, min(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) limit 100;")
tdSql.checkRows(8)
tdSql.checkData(0, 1, 0)
tdSql.query(f"select groupid, avg(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) limit 10000;")
tdSql.checkRows(8)
tdSql.checkData(0, 1, tdSql.getData(7, 1))
tdSql.query(f"select current, avg(current) from {self.dbname}.{self.stable} partition by current interval(5d) limit 100;")
tdSql.checkData(0, 0, tdSql.getData(0, 1))
tdSql.query(f"select groupid, last(voltage), min(current) from {self.dbname}.{self.stable} partition by groupid interval(5d) limit 10")
tdSql.checkRows(8)
tdSql.checkData(0, 1, tdSql.getData(7, 1))
tdSql.checkData(0, 2, tdSql.getData(7, 2))
tdSql.query(f"select groupid, min(current), min(voltage) from {self.dbname}.{self.stable} partition by tbname, groupid interval(5d) limit 100;")
tdSql.checkRows(8)
tdSql.checkData(0, 1, 0)
tdSql.checkData(0, 2, 215)
tdSql.checkData(7, 1, 0)
tdSql.checkData(7, 2, 215)
tdSql.query(f"select groupid, min(voltage), min(current) from {self.dbname}.{self.stable} partition by tbname, groupid interval(5d) limit 100;")
tdSql.checkRows(8)
tdSql.checkData(0, 2, 0)
tdSql.checkData(0, 1, 215)
tdSql.checkData(7, 2, 0)
tdSql.checkData(7, 1, 215)
def run(self):
tdSql.prepare()
self.prepare_datas("stb",self.tb_nums,self.row_nums)
self.basic_query()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -235,6 +235,8 @@ python3 ./test.py -f 2-query/mavg.py -P
python3 ./test.py -f 2-query/mavg.py -P -R
python3 ./test.py -f 2-query/max_partition.py -P
python3 ./test.py -f 2-query/max_partition.py -P -R
python3 ./test.py -f 2-query/partition_limit_interval.py -P
python3 ./test.py -f 2-query/partition_limit_interval.py -P -R
python3 ./test.py -f 2-query/max_min_last_interval.py -P
python3 ./test.py -f 2-query/last_row_interval.py -P
python3 ./test.py -f 2-query/max.py -P
@ -481,6 +483,7 @@ python3 ./test.py -f 2-query/irate.py -P -Q 2
python3 ./test.py -f 2-query/function_null.py -P -Q 2
python3 ./test.py -f 2-query/count_partition.py -P -Q 2
python3 ./test.py -f 2-query/max_partition.py -P -Q 2
python3 ./test.py -f 2-query/partition_limit_interval.py -P -Q 2
python3 ./test.py -f 2-query/max_min_last_interval.py -P -Q 2
python3 ./test.py -f 2-query/last_row_interval.py -P -Q 2
python3 ./test.py -f 2-query/last_row.py -P -Q 2
@ -576,6 +579,7 @@ python3 ./test.py -f 2-query/irate.py -P -Q 3
python3 ./test.py -f 2-query/function_null.py -P -Q 3
python3 ./test.py -f 2-query/count_partition.py -P -Q 3
python3 ./test.py -f 2-query/max_partition.py -P -Q 3
python3 ./test.py -f 2-query/partition_limit_interval.py -P -Q 3
python3 ./test.py -f 2-query/max_min_last_interval.py -P -Q 3
python3 ./test.py -f 2-query/last_row_interval.py -P -Q 3
python3 ./test.py -f 2-query/last_row.py -P -Q 3
@ -673,6 +677,7 @@ python3 ./test.py -f 2-query/irate.py -P -Q 4
python3 ./test.py -f 2-query/function_null.py -P -Q 4
python3 ./test.py -f 2-query/count_partition.py -P -Q 4
python3 ./test.py -f 2-query/max_partition.py -P -Q 4
python3 ./test.py -f 2-query/partition_limit_interval.py -P -Q 4
python3 ./test.py -f 2-query/max_min_last_interval.py -P -Q 4
python3 ./test.py -f 2-query/last_row_interval.py -P -Q 4
python3 ./test.py -f 2-query/last_row.py -P -Q 4