fix updateTupleData

This commit is contained in:
Ganlin Zhao 2023-01-04 17:18:14 +08:00
parent d5c92d3788
commit 6411e717df
1 changed files with 22 additions and 11 deletions

View File

@ -2024,20 +2024,19 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
SFirstLastRes* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
if (pCtx->subsidiaries.num <= 0) {
return TSDB_CODE_SUCCESS;
}
if (!pInfo->hasResult) {
int32_t code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
} else {
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
code = updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
}
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) {
@ -2965,7 +2964,10 @@ int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSD
// save the data of this tuple by over writing the old data
if (pCtx->subsidiaries.num > 0) {
updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
int32_t code = updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
#ifdef BUF_PAGE_DEBUG
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
@ -3085,6 +3087,10 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) {
if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
memcpy(pPage->data + pPos->offset, pBuf, length);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
@ -3099,8 +3105,7 @@ int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc
prepareBuf(pCtx);
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos);
return TSDB_CODE_SUCCESS;
return doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos);
}
static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) {
@ -4535,7 +4540,10 @@ static int32_t doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char*
if (j < pInfo->samples) {
sampleAssignResult(pInfo, data, j);
if (pCtx->subsidiaries.num > 0) {
updateTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
int32_t code = updateTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
@ -4881,7 +4889,10 @@ static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCt
} else {
(*pHashItem)->count += 1;
if (pCtx->subsidiaries.num > 0) {
updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos));
int32_t code = updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}