fix: add pk to funcInputGetNextRow

This commit is contained in:
slzhou 2024-03-25 15:10:25 +08:00
parent 2cddeccfc7
commit a5ba546bd4
2 changed files with 37 additions and 21 deletions

View File

@ -185,6 +185,7 @@ typedef struct SFuncInputRow {
TSKEY ts; TSKEY ts;
bool isDataNull; bool isDataNull;
char* pData; char* pData;
char* pPk;
SSDataBlock* block; // prev row block or src block SSDataBlock* block; // prev row block or src block
int32_t rowIndex; // prev row block ? 0 : rowIndex in srcBlock int32_t rowIndex; // prev row block ? 0 : rowIndex in srcBlock
@ -198,7 +199,8 @@ typedef struct SFuncInputRowIter {
bool hasPrev; bool hasPrev;
SInputColumnInfoData* pInput; SInputColumnInfoData* pInput;
SColumnInfoData* pData; SColumnInfoData* pDataCol;
SColumnInfoData* pPkCol;
TSKEY* tsList; TSKEY* tsList;
int32_t rowIndex; int32_t rowIndex;
int32_t inputEndIndex; int32_t inputEndIndex;
@ -207,6 +209,7 @@ typedef struct SFuncInputRowIter {
TSKEY prevBlockTsEnd; TSKEY prevBlockTsEnd;
bool prevIsDataNull; bool prevIsDataNull;
char* pPrevData; char* pPrevData;
char* pPrevPk;
SSDataBlock* pPrevRowBlock; // pre one row block SSDataBlock* pPrevRowBlock; // pre one row block
//TODO: //TODO:

View File

@ -424,7 +424,8 @@ int32_t funcInputUpdate(SqlFunctionCtx* pCtx) {
if (!pCtx->bInputFinished) { if (!pCtx->bInputFinished) {
pIter->pInput = &pCtx->input; pIter->pInput = &pCtx->input;
pIter->tsList = (TSKEY*)pIter->pInput->pPTS->pData; pIter->tsList = (TSKEY*)pIter->pInput->pPTS->pData;
pIter->pData = pIter->pInput->pData[0]; pIter->pDataCol = pIter->pInput->pData[0];
pIter->pPkCol = pIter->pInput->pPrimaryKey;
pIter->rowIndex = pIter->pInput->startRowIndex; pIter->rowIndex = pIter->pInput->startRowIndex;
pIter->inputEndIndex = pIter->rowIndex + pIter->pInput->numOfRows - 1; pIter->inputEndIndex = pIter->rowIndex + pIter->pInput->numOfRows - 1;
pIter->pSrcBlock = pCtx->pSrcBlock; pIter->pSrcBlock = pCtx->pSrcBlock;
@ -454,10 +455,16 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) {
blockDataDestroy(pIter->pPrevRowBlock); blockDataDestroy(pIter->pPrevRowBlock);
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex); pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex);
pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes);
char* srcData = colDataGetData(pIter->pData, pIter->inputEndIndex); pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes);
memcpy(pIter->pPrevData, srcData, pIter->pData->info.bytes); char* srcData = colDataGetData(pIter->pDataCol, pIter->inputEndIndex);
memcpy(pIter->pPrevData, srcData, pIter->pDataCol->info.bytes);
pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes);
char* pkData = colDataGetData(pIter->pPkCol, pIter->inputEndIndex);
memcpy(pIter->pPrevPk, pkData, pIter->pPkCol->info.bytes);
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
pIter->hasPrev = true; pIter->hasPrev = true;
@ -475,8 +482,9 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
pRow->rowIndex = 0; pRow->rowIndex = 0;
} else { } else {
pRow->ts = pIter->tsList[idx - 1]; pRow->ts = pIter->tsList[idx - 1];
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx - 1); pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx - 1);
pRow->pData = colDataGetData(pIter->pData, idx - 1); pRow->pData = colDataGetData(pIter->pDataCol, idx - 1);
pRow->pPk = colDataGetData(pIter->pPkCol, idx - 1);
pRow->block = pIter->pSrcBlock; pRow->block = pIter->pSrcBlock;
pRow->rowIndex = idx - 1; pRow->rowIndex = idx - 1;
} }
@ -492,20 +500,22 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
++idx; ++idx;
} }
pRow->ts = pIter->tsList[idx]; pRow->ts = pIter->tsList[idx];
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx); pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx);
pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex); pRow->pData = colDataGetData(pIter->pDataCol, idx);
pRow->pPk = colDataGetData(pIter->pPkCol, idx);
pRow->block = pIter->pSrcBlock; pRow->block = pIter->pSrcBlock;
pRow->rowIndex = idx;
pIter->rowIndex = idx + 1; pIter->rowIndex = idx + 1;
return true; return true;
} else { } else {
pIter->hasPrev = true; pIter->hasPrev = true;
pIter->prevBlockTsEnd = tsEnd; pIter->prevBlockTsEnd = tsEnd;
// TODO pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex);
pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex); pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes);
pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes); memcpy(pIter->pPrevData, colDataGetData(pIter->pDataCol, pIter->inputEndIndex), pIter->pDataCol->info.bytes);
memcpy(pIter->pPrevData, colDataGetData(pIter->pData, pIter->inputEndIndex), pIter->pData->info.bytes); pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes);
memcpy(pIter->pPrevPk, colDataGetData(pIter->pPkCol, pIter->inputEndIndex), pIter->pPkCol->info.bytes);
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
return false; return false;
} }
@ -523,8 +533,9 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
++idx; ++idx;
} }
pRow->ts = pIter->tsList[idx]; pRow->ts = pIter->tsList[idx];
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx); pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx);
pRow->pData = colDataGetData(pIter->pData, idx); pRow->pData = colDataGetData(pIter->pDataCol, idx);
pRow->pPk = colDataGetData(pIter->pPkCol, idx);
pRow->block = pIter->pSrcBlock; pRow->block = pIter->pSrcBlock;
pRow->rowIndex = idx; pRow->rowIndex = idx;
@ -535,8 +546,9 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
} else { } else {
if (pIter->rowIndex <= pIter->inputEndIndex) { if (pIter->rowIndex <= pIter->inputEndIndex) {
pRow->ts = pIter->tsList[pIter->rowIndex]; pRow->ts = pIter->tsList[pIter->rowIndex];
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex); pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex); pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
pRow->pPk = colDataGetData(pIter->pPkCol, pIter->rowIndex);
pRow->block = pIter->pSrcBlock; pRow->block = pIter->pSrcBlock;
pRow->rowIndex = pIter->rowIndex; pRow->rowIndex = pIter->rowIndex;
@ -563,8 +575,9 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
if (pIter->rowIndex <= pIter->inputEndIndex) { if (pIter->rowIndex <= pIter->inputEndIndex) {
pRow->ts = pIter->tsList[pIter->rowIndex]; pRow->ts = pIter->tsList[pIter->rowIndex];
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex); pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex); pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
pRow->pPk = NULL;
pRow->block = pIter->pSrcBlock; pRow->block = pIter->pSrcBlock;
pRow->rowIndex = pIter->rowIndex; pRow->rowIndex = pIter->rowIndex;