fix(query): fix the bug caused by refactor in first/last function implementation.
This commit is contained in:
parent
a6177f54ca
commit
a036d2f3ff
|
@ -196,7 +196,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
|
|||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
|
||||
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows);
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||
|
||||
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||
|
|
|
@ -1076,8 +1076,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||
if (0 == numOfRows) {
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows) {
|
||||
if (0 == numOfRows || numOfRows <= existRows) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1088,19 +1088,16 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
|
|||
}
|
||||
|
||||
pColumn->varmeta.offset = (int32_t*)tmp;
|
||||
memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
|
||||
|
||||
pColumn->varmeta.length = 0;
|
||||
pColumn->varmeta.allocLen = 0;
|
||||
taosMemoryFreeClear(pColumn->pData);
|
||||
memset(&pColumn->varmeta.offset[existRows], 0, sizeof(int32_t) * (numOfRows - existRows));
|
||||
} else {
|
||||
char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows));
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t oldLen = BitmapLen(existRows);
|
||||
pColumn->nullbitmap = tmp;
|
||||
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
|
||||
memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
|
||||
|
||||
if (pColumn->info.type == TSDB_DATA_TYPE_NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1136,7 +1133,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
|||
|
||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
code = colInfoDataEnsureCapacity(p, numOfRows);
|
||||
code = colInfoDataEnsureCapacity(p, pDataBlock->info.rows, numOfRows);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
@ -1181,7 +1178,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
|||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
|
||||
int32_t code = colInfoDataEnsureCapacity(pDst, pDataBlock->info.rows);
|
||||
int32_t code = colInfoDataEnsureCapacity(pDst, 0, pDataBlock->info.rows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -141,7 +141,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
|||
colInfo.info.colId = pColSchema->colId;
|
||||
colInfo.info.type = pColSchema->type;
|
||||
|
||||
if (colInfoDataEnsureCapacity(&colInfo, numOfRows) < 0) {
|
||||
if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) {
|
||||
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -392,7 +392,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
|||
SColumnInfoData colInfo = {{0}, 0};
|
||||
colInfo.info = pCond->colList[i];
|
||||
|
||||
int32_t code = colInfoDataEnsureCapacity(&colInfo, pReadHandle->outputCapacity);
|
||||
int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
|
|
|
@ -832,7 +832,7 @@ static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQuer
|
|||
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
pColData->info.bytes = sizeof(int64_t);
|
||||
|
||||
colInfoDataEnsureCapacity(pColData, 5);
|
||||
colInfoDataEnsureCapacity(pColData, 0, 5);
|
||||
colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
|
||||
colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);
|
||||
|
||||
|
@ -1061,7 +1061,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
|
|||
}
|
||||
|
||||
ASSERT(!IS_VAR_DATA_TYPE(type));
|
||||
colInfoDataEnsureCapacity(pColInfo, numOfRows);
|
||||
colInfoDataEnsureCapacity(pColInfo, 0, numOfRows);
|
||||
|
||||
if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
|
||||
int64_t v = pFuncParam->param.i;
|
||||
|
|
|
@ -767,10 +767,18 @@ void percentileFinalize(SqlFunctionCtx* pCtx) {
|
|||
|
||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
pEnv->calcMemSize = pNode->node.resType.bytes;
|
||||
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
|
||||
return true;
|
||||
}
|
||||
|
||||
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
|
||||
if (pTsColInfo == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return *(TSKEY*) colDataGetData(pTsColInfo, rowIndex);
|
||||
}
|
||||
|
||||
// This ordinary first function does not care if current scan is ascending order or descending order scan
|
||||
// the OPTIMIZED version of first function will only handle the ascending order scan
|
||||
int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
||||
|
@ -792,8 +800,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
|||
|
||||
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL;
|
||||
|
||||
TSKEY startKey = *(int64_t*)(pInput->pPTS ? colDataGetData(pInput->pPTS, 0) : 0);
|
||||
TSKEY endKey = *(int64_t*)(pInput->pPTS ? colDataGetData(pInput->pPTS, pInput->totalRows - 1) : 0);
|
||||
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
|
||||
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
|
||||
|
||||
int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
|
||||
|
||||
|
@ -812,12 +820,14 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||
|
||||
TSKEY cts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
|
||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
|
||||
memcpy(buf, data, bytes);
|
||||
*(TSKEY*)(buf + bytes) = cts;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
|
||||
pResInfo->numOfRes = 1;
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
|
@ -838,12 +848,13 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||
|
||||
TSKEY cts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
|
||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
|
||||
memcpy(buf, data, bytes);
|
||||
*(TSKEY*)(buf + bytes) = cts;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
pResInfo->numOfRes = 1;
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
|
|
|
@ -30,7 +30,7 @@ SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
|
|||
pColumnData->info.scale = pType->scale;
|
||||
pColumnData->info.precision = pType->precision;
|
||||
|
||||
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
|
||||
int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pColumnData);
|
||||
|
@ -45,7 +45,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
|
|||
in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
|
||||
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
|
||||
|
||||
colInfoDataEnsureCapacity(out->columnData, 1);
|
||||
colInfoDataEnsureCapacity(out->columnData, 0, 1);
|
||||
int32_t code = vectorConvertImpl(&in, out);
|
||||
sclFreeParam(&in);
|
||||
|
||||
|
|
|
@ -155,7 +155,7 @@ void flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
|
|||
res->info.numOfCols++;
|
||||
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
|
||||
|
||||
colInfoDataEnsureCapacity(pColumn, rowNum);
|
||||
colInfoDataEnsureCapacity(pColumn, 0, rowNum);
|
||||
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
colDataAppend(pColumn, i, (const char *)value, false);
|
||||
|
|
|
@ -99,7 +99,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s
|
|||
SColumnInfoData idata = {0};
|
||||
idata.info = *colInfo;
|
||||
|
||||
colInfoDataEnsureCapacity(&idata, rows);
|
||||
colInfoDataEnsureCapacity(&idata, 0, rows);
|
||||
|
||||
taosArrayPush(res->pDataBlock, &idata);
|
||||
|
||||
|
@ -186,7 +186,7 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
|
|||
res->info.numOfCols++;
|
||||
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
|
||||
|
||||
colInfoDataEnsureCapacity(pColumn, rowNum);
|
||||
colInfoDataEnsureCapacity(pColumn, 0, rowNum);
|
||||
|
||||
for (int32_t i = 0; i < rowNum; ++i) {
|
||||
colDataAppend(pColumn, i, (const char *)value, false);
|
||||
|
@ -1467,7 +1467,7 @@ void scltMakeDataBlock(SScalarParam **pInput, int32_t type, void *pVal, int32_t
|
|||
input->numOfRows = num;
|
||||
|
||||
input->columnData->info = createColumnInfo(0, type, bytes);
|
||||
colInfoDataEnsureCapacity(input->columnData, num);
|
||||
colInfoDataEnsureCapacity(input->columnData, 0, num);
|
||||
|
||||
if (setVal) {
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
|
|
Loading…
Reference in New Issue