Merge branch 'feat/TS-4243-3.0' of github.com:taosdata/TDengine into TEST/3.0/TS-4243

This commit is contained in:
Chris Zhai 2024-04-11 17:35:34 +08:00
commit 3ce30b4606
4 changed files with 37 additions and 12 deletions

View File

@ -1502,10 +1502,12 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pVal->type = pDataBlock->info.pks[0].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData);
pVal->nData = pDataBlock->info.pks[0].nData;
pVal = &pBlock->info.pks[1];
pVal->type = pDataBlock->info.pks[1].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
pVal->nData = pDataBlock->info.pks[1].nData;
}
if (copyData) {

View File

@ -179,9 +179,8 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
}
if (IS_VAR_DATA_TYPE(indices[i].type)) {
tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData);
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData);
pKey->pks[i].pData += pKey->pks[i].nData;
tdata += tGetU32v(tdata, &pKey->pks[i].nData);
memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData);
} else {
memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes);
}
@ -485,7 +484,7 @@ void tsdbReleaseDataBlock2(STsdbReader* pReader) {
}
static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock,
SQueryTableDataCond* pCond) {
SQueryTableDataCond* pCond, SBlockLoadSuppInfo* pSup) {
pResBlockInfo->capacity = capacity;
pResBlockInfo->pResBlock = pResBlock;
terrno = 0;
@ -493,6 +492,28 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit
if (pResBlockInfo->pResBlock == NULL) {
pResBlockInfo->freeBlock = true;
pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity);
if (pSup->numOfPks > 0) {
SSDataBlock* p = pResBlockInfo->pResBlock;
p->info.pks[0].type = pSup->pk.type;
p->info.pks[1].type = pSup->pk.type;
if (IS_VAR_DATA_TYPE(pSup->pk.type)) {
p->info.pks[0].pData = taosMemoryCalloc(1, pSup->pk.bytes);
if (p->info.pks[0].pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p->info.pks[1].pData = taosMemoryCalloc(1, pSup->pk.bytes);
if (p->info.pks[1].pData == NULL) {
taosMemoryFreeClear(p->info.pks[0].pData);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->info.pks[0].nData = pSup->pk.bytes;
p->info.pks[1].nData = pSup->pk.bytes;
}
}
} else {
pResBlockInfo->freeBlock = false;
}
@ -525,14 +546,9 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
pReader->type = pCond->type;
pReader->bFilesetDelimited = false;
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
if (pCond->numOfCols <= 0) {
tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
code = TSDB_CODE_INVALID_PARA;
@ -548,6 +564,11 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pReader->pkComparFn = getComparFunc(pSup->pk.type, 0);
}
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond, pSup);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
code = tBlockDataCreate(&pReader->status.fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
@ -569,8 +590,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
goto _end;
}
pReader->bFilesetDelimited = false;
tsdbInitReaderLock(pReader);
tsem_init(&pReader->resumeAfterSuspend, 0, 0);

View File

@ -276,6 +276,8 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
pBlockInfo->pks[0].nData = pInfoData->info.bytes;
pBlockInfo->pks[1].nData = pInfoData->info.bytes;
}
break;
}
}

View File

@ -4133,6 +4133,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
if (pInfo->rtnNextDurationBlocks) {
qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d",
GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks);
if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) {
pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx];
++pInfo->nextDurationBlocksIdx;
@ -4141,6 +4142,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
blockDataDestroy(pInfo->nextDurationBlocks[i]);
pInfo->nextDurationBlocks[i] = NULL;
}
pInfo->rtnNextDurationBlocks = false;
pInfo->nextDurationBlocksIdx = 0;
pInfo->numNextDurationBlocks = 0;