Merge pull request #19044 from taosdata/feature/3_liaohj
fix(query): check for buffer in first/last merge.
This commit is contained in:
commit
a4301cc962
|
@ -1995,6 +1995,22 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
|
|||
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
|
||||
}
|
||||
|
||||
static void prepareBuf(SqlFunctionCtx* pCtx) {
|
||||
if (pCtx->subsidiaries.rowLen == 0) {
|
||||
int32_t rowLen = 0;
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
rowLen += pc->pExpr->base.resSchema.bytes;
|
||||
}
|
||||
|
||||
pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool);
|
||||
pCtx->subsidiaries.buf = taosMemoryMalloc(pCtx->subsidiaries.rowLen);
|
||||
}
|
||||
|
||||
ASSERT(pCtx->subsidiaries.buf != NULL);
|
||||
ASSERT(pCtx->subsidiaries.rowLen > 0);
|
||||
}
|
||||
|
||||
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
||||
SFirstLastRes* pInfo) {
|
||||
if (pCtx->subsidiaries.num <= 0) {
|
||||
|
@ -2003,8 +2019,6 @@ static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowInde
|
|||
|
||||
if (!pInfo->hasResult) {
|
||||
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock, NULL);
|
||||
ASSERT(pCtx->subsidiaries.buf != NULL);
|
||||
ASSERT(pCtx->subsidiaries.rowLen > 0);
|
||||
} else {
|
||||
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
||||
}
|
||||
|
@ -2960,16 +2974,7 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
|
|||
}
|
||||
|
||||
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, const STupleKey* pKey) {
|
||||
if (pCtx->subsidiaries.rowLen == 0) {
|
||||
int32_t rowLen = 0;
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
rowLen += pc->pExpr->base.resSchema.bytes;
|
||||
}
|
||||
|
||||
pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool);
|
||||
pCtx->subsidiaries.buf = taosMemoryMalloc(pCtx->subsidiaries.rowLen);
|
||||
}
|
||||
prepareBuf(pCtx);
|
||||
|
||||
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
|
||||
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pKey);
|
||||
|
@ -2989,6 +2994,8 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
|
|||
}
|
||||
|
||||
int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||
prepareBuf(pCtx);
|
||||
|
||||
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
|
||||
doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
Loading…
Reference in New Issue