fix(query): check for buffer in first/last merge.
This commit is contained in:
parent
0f60ddb3cd
commit
7059d71e1f
|
@ -1995,16 +1995,31 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
|
||||||
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
|
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void prepareBuf(SqlFunctionCtx* pCtx) {
|
||||||
|
if (pCtx->subsidiaries.rowLen == 0) {
|
||||||
|
int32_t rowLen = 0;
|
||||||
|
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||||
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||||
|
rowLen += pc->pExpr->base.resSchema.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool);
|
||||||
|
pCtx->subsidiaries.buf = taosMemoryMalloc(pCtx->subsidiaries.rowLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pCtx->subsidiaries.buf != NULL);
|
||||||
|
ASSERT(pCtx->subsidiaries.rowLen > 0);
|
||||||
|
}
|
||||||
|
|
||||||
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
||||||
SFirstLastRes* pInfo) {
|
SFirstLastRes* pInfo) {
|
||||||
if (pCtx->subsidiaries.num <= 0) {
|
if (pCtx->subsidiaries.num <= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
prepareBuf(pCtx);
|
||||||
if (!pInfo->hasResult) {
|
if (!pInfo->hasResult) {
|
||||||
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock, NULL);
|
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock, NULL);
|
||||||
ASSERT(pCtx->subsidiaries.buf != NULL);
|
|
||||||
ASSERT(pCtx->subsidiaries.rowLen > 0);
|
|
||||||
} else {
|
} else {
|
||||||
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
||||||
}
|
}
|
||||||
|
@ -2960,17 +2975,6 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
|
||||||
}
|
}
|
||||||
|
|
||||||
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, const STupleKey* pKey) {
|
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, const STupleKey* pKey) {
|
||||||
if (pCtx->subsidiaries.rowLen == 0) {
|
|
||||||
int32_t rowLen = 0;
|
|
||||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
|
||||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
|
||||||
rowLen += pc->pExpr->base.resSchema.bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool);
|
|
||||||
pCtx->subsidiaries.buf = taosMemoryMalloc(pCtx->subsidiaries.rowLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
|
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
|
||||||
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pKey);
|
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pKey);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue