fix(tsdb): prepare the pk buf for blocks generated by reader.

This commit is contained in:
Haojun Liao 2024-04-11 17:09:07 +08:00
parent 1320c1c0e1
commit b4ea80b637
3 changed files with 33 additions and 9 deletions

View File

@ -1502,10 +1502,12 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pVal->type = pDataBlock->info.pks[0].type; pVal->type = pDataBlock->info.pks[0].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData);
pVal->nData = pDataBlock->info.pks[0].nData;
pVal = &pBlock->info.pks[1]; pVal = &pBlock->info.pks[1];
pVal->type = pDataBlock->info.pks[1].type; pVal->type = pDataBlock->info.pks[1].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
pVal->nData = pDataBlock->info.pks[1].nData;
} }
if (copyData) { if (copyData) {

View File

@ -485,7 +485,7 @@ void tsdbReleaseDataBlock2(STsdbReader* pReader) {
} }
static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock,
SQueryTableDataCond* pCond) { SQueryTableDataCond* pCond, SBlockLoadSuppInfo* pSup) {
pResBlockInfo->capacity = capacity; pResBlockInfo->capacity = capacity;
pResBlockInfo->pResBlock = pResBlock; pResBlockInfo->pResBlock = pResBlock;
terrno = 0; terrno = 0;
@ -493,6 +493,28 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit
if (pResBlockInfo->pResBlock == NULL) { if (pResBlockInfo->pResBlock == NULL) {
pResBlockInfo->freeBlock = true; pResBlockInfo->freeBlock = true;
pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity); pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity);
if (pSup->numOfPks > 0) {
SSDataBlock* p = pResBlockInfo->pResBlock;
p->info.pks[0].type = pSup->pk.type;
p->info.pks[1].type = pSup->pk.type;
if (IS_VAR_DATA_TYPE(pSup->pk.type)) {
p->info.pks[0].pData = taosMemoryCalloc(1, pSup->pk.bytes);
if (p->info.pks[0].pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p->info.pks[1].pData = taosMemoryCalloc(1, pSup->pk.bytes);
if (p->info.pks[1].pData == NULL) {
taosMemoryFreeClear(p->info.pks[0].pData);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->info.pks[0].nData = pSup->pk.bytes;
p->info.pks[1].nData = pSup->pk.bytes;
}
}
} else { } else {
pResBlockInfo->freeBlock = false; pResBlockInfo->freeBlock = false;
} }
@ -525,14 +547,9 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL; pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
pReader->type = pCond->type; pReader->type = pCond->type;
pReader->bFilesetDelimited = false;
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
if (pCond->numOfCols <= 0) { if (pCond->numOfCols <= 0) {
tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr); tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
@ -548,6 +565,11 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pReader->pkComparFn = getComparFunc(pSup->pk.type, 0); pReader->pkComparFn = getComparFunc(pSup->pk.type, 0);
} }
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond, pSup);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
code = tBlockDataCreate(&pReader->status.fileBlockData); code = tBlockDataCreate(&pReader->status.fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
@ -569,8 +591,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
goto _end; goto _end;
} }
pReader->bFilesetDelimited = false;
tsdbInitReaderLock(pReader); tsdbInitReaderLock(pReader);
tsem_init(&pReader->resumeAfterSuspend, 0, 0); tsem_init(&pReader->resumeAfterSuspend, 0, 0);

View File

@ -276,6 +276,8 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
pBlockInfo->pks[0].nData = pInfoData->info.bytes; pBlockInfo->pks[0].nData = pInfoData->info.bytes;
pBlockInfo->pks[1].nData = pInfoData->info.bytes; pBlockInfo->pks[1].nData = pInfoData->info.bytes;
} }
break;
} }
} }