fix: slot id

This commit is contained in:
factosea 2024-06-04 19:05:19 +08:00
parent c42e627a41
commit 7a8e87f8cd
3 changed files with 65 additions and 59 deletions

View File

@ -256,7 +256,6 @@ SSDataBlock* createDataBlock();
void* blockDataDestroy(SSDataBlock* pBlock); void* blockDataDestroy(SSDataBlock* pBlock);
void blockDataFreeRes(SSDataBlock* pBlock); void blockDataFreeRes(SSDataBlock* pBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createBlockDataNotLoaded(SSDataBlock* pDataBlock);
SSDataBlock* createSpecialDataBlock(EStreamType type); SSDataBlock* createSpecialDataBlock(EStreamType type);
SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx); SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx);
@ -283,6 +282,8 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* p
void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);
void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -21,7 +21,7 @@
#define MALLOC_ALIGN_BYTES 32 #define MALLOC_ALIGN_BYTES 32
static void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
@ -1733,62 +1733,6 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
return pDstBlock; return pDstBlock;
} }
SSDataBlock* createBlockDataNotLoaded(SSDataBlock* pDataBlock) {
if (pDataBlock == NULL) {
return NULL;
}
SSDataBlock* pDstBlock = createDataBlock();
pDstBlock->info = pDataBlock->info;
pDstBlock->info.rows = 0;
pDstBlock->info.capacity = 0;
pDstBlock->info.rowSize = 0;
pDstBlock->info.id = pDataBlock->info.id;
pDstBlock->info.blankFill = pDataBlock->info.blankFill;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
blockDataAppendColInfo(pDstBlock, &colInfo);
}
copyPkVal(&pDstBlock->info, &pDataBlock->info);
int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock);
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
}
pDstBlock->info.rows = pDataBlock->info.rows;
pDstBlock->info.capacity = pDataBlock->info.rows;
pDstBlock->pBlockAgg = pDataBlock->pBlockAgg;
pDataBlock->pBlockAgg = NULL;
// int numOfSlots = sizeof(pDataBlock->pBlockAgg)/POINTER_BYTES;
// if (pDataBlock->pBlockAgg != NULL) {
// pDstBlock->pBlockAgg = taosMemoryCalloc(numOfSlots, POINTER_BYTES);
// if (pDstBlock->pBlockAgg == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// return NULL;
// }
// for (int j = 0; j < numOfSlots; ++j) {
// pDstBlock->pBlockAgg[j] = &(*pDataBlock->pBlockAgg)[j];
// }
// }
return pDstBlock;
}
SSDataBlock* createDataBlock() { SSDataBlock* createDataBlock() {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) { if (pBlock == NULL) {

View File

@ -565,6 +565,67 @@ _error:
return NULL; return NULL;
} }
SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBlock* pDataBlock) {
if (pDataBlock == NULL) {
return NULL;
}
SSDataBlock* pDstBlock = createDataBlock();
pDstBlock->info = pDataBlock->info;
pDstBlock->info.rows = 0;
pDstBlock->info.capacity = 0;
pDstBlock->info.rowSize = 0;
pDstBlock->info.id = pDataBlock->info.id;
pDstBlock->info.blankFill = pDataBlock->info.blankFill;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
pDstBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
if (pDstBlock->pBlockAgg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
copyPkVal(&pDstBlock->info, &pDataBlock->info);
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId);
if (pSrc) {
SColumnInfoData colInfo = {.hasNull = true, .info = pSrc->info};
blockDataAppendColInfo(pDstBlock, &colInfo);
}
}
int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock);
return NULL;
}
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
if (slotId < numOfCols) {
pDstBlock->pBlockAgg[slotId] = pDataBlock->pBlockAgg[i];
pDstBlock->pBlockAgg[slotId].colId = i;
}
// SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, slotId);
// if (pSrc) {
// SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i);
// colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
// }
}
pDstBlock->info.rows = pDataBlock->info.rows;
pDstBlock->info.capacity = pDataBlock->info.rows;
return pDstBlock;
}
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SPartitionOperatorInfo* pInfo = pOperator->info; SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -653,7 +714,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage); releaseBufPage(pInfo->pBuf, pPage);
} else { } else {
SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pBlock); SSDataBlock* dataNotLoadBlock = createBlockDataNotLoaded(pOperator, pBlock);
if (dataNotLoadBlock == NULL) { if (dataNotLoadBlock == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno); T_LONG_JMP(pTaskInfo->env, terrno);
} }