From 11ed8a1540fe444531d50ad920a1b49e919e894b Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Wed, 25 Sep 2024 16:14:47 +0800 Subject: [PATCH] fix:[TD-32184] Support backwards compatibility for function's result info. --- include/libs/function/function.h | 12 ++ source/libs/executor/inc/executil.h | 4 + source/libs/executor/src/executil.c | 116 ++++++++++++++++++ .../executor/src/streamtimewindowoperator.c | 5 + 4 files changed, 137 insertions(+) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 7ca046762a..c66d74a905 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -29,6 +29,7 @@ struct SqlFunctionCtx; struct SResultRowEntryInfo; struct SFunctionNode; +struct SExprSupp; typedef struct SScalarParam SScalarParam; typedef struct SStreamState SStreamState; @@ -43,6 +44,7 @@ typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx); +typedef int32_t (*FExecDecode)(struct SqlFunctionCtx *pCtx, const char *buf, struct SResultRowEntryInfo *pResultCellInfo, int32_t version); typedef int32_t (*processFuncByRow)(SArray* pCtx); // array of SqlFunctionCtx typedef struct SScalarFuncExecFuncs { @@ -57,6 +59,7 @@ typedef struct SFuncExecFuncs { FExecFinalize finalize; FExecCombine combine; FExecCleanUp cleanup; + FExecDecode decode; processFuncByRow processFuncByRow; } SFuncExecFuncs; @@ -65,6 +68,8 @@ typedef struct SFuncExecFuncs { #define TOP_BOTTOM_QUERY_LIMIT 100 #define FUNCTIONS_NAME_MAX_LENGTH 32 +#define FUNCTION_RESULT_INFO_VERSION 1 + typedef struct SResultRowEntryInfo { bool initialized : 1; // output buffer has been initialized bool complete : 1; // query has completed @@ -165,6 +170,11 @@ typedef struct STdbState { void *txn; } STdbState; +typedef struct SResultRowStore { + int32_t (*resultRowPut)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); + int32_t (*resultRowGet)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); +} SResultRowStore; + struct SStreamState { STdbState *pTdbState; struct SStreamFileState *pFileState; @@ -175,6 +185,8 @@ struct SStreamState { int64_t streamBackendRid; int8_t dump; int32_t tsIndex; + SResultRowStore *pResultRowStore; + struct SExprSupp *pExprSupp; }; typedef struct SFunctionStateStore { diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 95035dd96f..9e36a29476 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -48,6 +48,7 @@ typedef struct SGroupResInfo { } SGroupResInfo; typedef struct SResultRow { + int32_t version; int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer int32_t offset : 29; // row index in buffer page bool startInterp; // the time window start timestamp has done the interpolation already. @@ -152,6 +153,9 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo return pRow; } +int32_t getResultRowFromBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); +int32_t putResultRowToBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); + int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 4fe45ff72e..141d64bfd1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -93,6 +93,122 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { return rowSize; } +// Convert buf read from rocksdb to result row +int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { + SqlFunctionCtx *pCtx = pSup->pCtx; + int32_t *offset = pSup->rowEntryInfoOffset; + SResultRow *pResultRow = (SResultRow*)outBuf; + size_t processedSize = 0; + int32_t code = TSDB_CODE_SUCCESS; + if (inBuf == NULL) { + qError("invalid input buffer, inBuf:%p", inBuf); + return TSDB_CODE_INVALID_PARA; + } + + // calculate the size of output buffer + *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs); + *outBuf = taosMemoryMalloc(*outBufSize); + if (*outBuf == NULL) { + qError("failed to allocate memory for output buffer, size:%zu", *outBufSize); + return terrno; + } + (void)memcpy(pResultRow, inBuf, sizeof(SResultRow)); + inBuf += sizeof(SResultRow); + processedSize += sizeof(SResultRow); + for (int32_t i = 0; i < pSup->numOfExprs; ++i) { + int32_t len = *(int32_t*)inBuf; + inBuf += sizeof(int32_t); + processedSize += sizeof(int32_t); + if (pCtx->fpSet.decode) { + code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to decode result row, code:%d", code); + return code; + } + } else { + (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len); + } + inBuf += len; + processedSize += len; + } + void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) + + sizeof(SResultRowEntryInfo) + + pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; + (void)memcpy(pos, inBuf, pSup->numOfExprs * sizeof(bool)); + inBuf += pSup->numOfExprs * sizeof(bool); + processedSize += pSup->numOfExprs * sizeof(bool); + + if (processedSize < inBufSize) { + // stream stores extra data after result row + size_t leftLen = inBufSize - processedSize; + TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen); + if (*outBuf == NULL) { + qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen); + return terrno; + } + (void)memcpy(outBuf + processedSize, inBuf, leftLen); + inBuf += leftLen; + processedSize += leftLen; + *outBufSize += leftLen; + } + return TSDB_CODE_SUCCESS; +} + +// Convert result row to buf for rocksdb +int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { + SqlFunctionCtx *pCtx = pSup->pCtx; + int32_t *offset = pSup->rowEntryInfoOffset; + SResultRow *pResultRow = (SResultRow*)inBuf; + size_t rowSize = getResultRowSize(pCtx, pSup->numOfExprs); + + if (inBuf == NULL) { + qError("invalid input buffer, inBuf:%p", inBuf); + return TSDB_CODE_INVALID_PARA; + } + if (rowSize > inBufSize) { + qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize); + return TSDB_CODE_INVALID_PARA; + } + + // calculate the size of output buffer + *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs; + if (rowSize < inBufSize) { + *outBufSize += inBufSize - rowSize; + } + + *outBuf = taosMemoryMalloc(*outBufSize); + if (*outBuf == NULL) { + qError("failed to allocate memory for output buffer, size:%zu", *outBufSize); + return terrno; + } + + pResultRow->version = FUNCTION_RESULT_INFO_VERSION; + (void)memcpy(outBuf, pResultRow, sizeof(SResultRow)); + outBuf += sizeof(SResultRow); + for (int32_t i = 0; i < pSup->numOfExprs; ++i) { + *(int32_t *) outBuf = offset[i]; + outBuf += sizeof(int32_t); + size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize; + (void)memcpy(outBuf, getResultEntryInfo(pResultRow, i, offset), len); + outBuf += len; + } + + // mark if col is null for top/bottom result(saveTupleData) + void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) + + sizeof(SResultRowEntryInfo) + + pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; + + (void)memcpy(outBuf, pos, pSup->numOfExprs * sizeof(bool)); + + if (rowSize < inBufSize) { + // stream stores extra data after result row + size_t leftLen = inBufSize - rowSize; + (void)memcpy(outBuf, inBuf + rowSize, leftLen); + outBuf += leftLen; + } + return TSDB_CODE_SUCCESS; +} + static void freeEx(void* p) { taosMemoryFree(*(void**)p); } void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index be27f277c0..cf539cb4cb 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2232,6 +2232,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->pResultRows = tSimpleHashInit(32, hashFn); QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno); + // used for backward compatibility of function's result info + pSup->pState->pResultRowStore->resultRowGet = getResultRowFromBuf; + pSup->pState->pResultRowStore->resultRowPut = putResultRowToBuf; + pSup->pState->pExprSupp = pExpSup; + for (int32_t i = 0; i < numOfOutput; ++i) { pExpSup->pCtx[i].saveHandle.pState = pSup->pState; }