fix:[TD-32184] Support backwards compatibility for function's result info.
This commit is contained in:
parent
80e2e426f0
commit
11ed8a1540
|
@ -29,6 +29,7 @@ struct SqlFunctionCtx;
|
||||||
struct SResultRowEntryInfo;
|
struct SResultRowEntryInfo;
|
||||||
|
|
||||||
struct SFunctionNode;
|
struct SFunctionNode;
|
||||||
|
struct SExprSupp;
|
||||||
typedef struct SScalarParam SScalarParam;
|
typedef struct SScalarParam SScalarParam;
|
||||||
typedef struct SStreamState SStreamState;
|
typedef struct SStreamState SStreamState;
|
||||||
|
|
||||||
|
@ -43,6 +44,7 @@ typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
||||||
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
|
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
|
||||||
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
|
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
|
||||||
|
typedef int32_t (*FExecDecode)(struct SqlFunctionCtx *pCtx, const char *buf, struct SResultRowEntryInfo *pResultCellInfo, int32_t version);
|
||||||
typedef int32_t (*processFuncByRow)(SArray* pCtx); // array of SqlFunctionCtx
|
typedef int32_t (*processFuncByRow)(SArray* pCtx); // array of SqlFunctionCtx
|
||||||
|
|
||||||
typedef struct SScalarFuncExecFuncs {
|
typedef struct SScalarFuncExecFuncs {
|
||||||
|
@ -57,6 +59,7 @@ typedef struct SFuncExecFuncs {
|
||||||
FExecFinalize finalize;
|
FExecFinalize finalize;
|
||||||
FExecCombine combine;
|
FExecCombine combine;
|
||||||
FExecCleanUp cleanup;
|
FExecCleanUp cleanup;
|
||||||
|
FExecDecode decode;
|
||||||
processFuncByRow processFuncByRow;
|
processFuncByRow processFuncByRow;
|
||||||
} SFuncExecFuncs;
|
} SFuncExecFuncs;
|
||||||
|
|
||||||
|
@ -65,6 +68,8 @@ typedef struct SFuncExecFuncs {
|
||||||
#define TOP_BOTTOM_QUERY_LIMIT 100
|
#define TOP_BOTTOM_QUERY_LIMIT 100
|
||||||
#define FUNCTIONS_NAME_MAX_LENGTH 32
|
#define FUNCTIONS_NAME_MAX_LENGTH 32
|
||||||
|
|
||||||
|
#define FUNCTION_RESULT_INFO_VERSION 1
|
||||||
|
|
||||||
typedef struct SResultRowEntryInfo {
|
typedef struct SResultRowEntryInfo {
|
||||||
bool initialized : 1; // output buffer has been initialized
|
bool initialized : 1; // output buffer has been initialized
|
||||||
bool complete : 1; // query has completed
|
bool complete : 1; // query has completed
|
||||||
|
@ -165,6 +170,11 @@ typedef struct STdbState {
|
||||||
void *txn;
|
void *txn;
|
||||||
} STdbState;
|
} STdbState;
|
||||||
|
|
||||||
|
typedef struct SResultRowStore {
|
||||||
|
int32_t (*resultRowPut)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
|
||||||
|
int32_t (*resultRowGet)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
|
||||||
|
} SResultRowStore;
|
||||||
|
|
||||||
struct SStreamState {
|
struct SStreamState {
|
||||||
STdbState *pTdbState;
|
STdbState *pTdbState;
|
||||||
struct SStreamFileState *pFileState;
|
struct SStreamFileState *pFileState;
|
||||||
|
@ -175,6 +185,8 @@ struct SStreamState {
|
||||||
int64_t streamBackendRid;
|
int64_t streamBackendRid;
|
||||||
int8_t dump;
|
int8_t dump;
|
||||||
int32_t tsIndex;
|
int32_t tsIndex;
|
||||||
|
SResultRowStore *pResultRowStore;
|
||||||
|
struct SExprSupp *pExprSupp;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SFunctionStateStore {
|
typedef struct SFunctionStateStore {
|
||||||
|
|
|
@ -48,6 +48,7 @@ typedef struct SGroupResInfo {
|
||||||
} SGroupResInfo;
|
} SGroupResInfo;
|
||||||
|
|
||||||
typedef struct SResultRow {
|
typedef struct SResultRow {
|
||||||
|
int32_t version;
|
||||||
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
|
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
|
||||||
int32_t offset : 29; // row index in buffer page
|
int32_t offset : 29; // row index in buffer page
|
||||||
bool startInterp; // the time window start timestamp has done the interpolation already.
|
bool startInterp; // the time window start timestamp has done the interpolation already.
|
||||||
|
@ -152,6 +153,9 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getResultRowFromBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
|
||||||
|
int32_t putResultRowToBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize);
|
||||||
|
|
||||||
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
|
int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
|
||||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||||
|
|
||||||
|
|
|
@ -93,6 +93,122 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
return rowSize;
|
return rowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert buf read from rocksdb to result row
|
||||||
|
int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) {
|
||||||
|
SqlFunctionCtx *pCtx = pSup->pCtx;
|
||||||
|
int32_t *offset = pSup->rowEntryInfoOffset;
|
||||||
|
SResultRow *pResultRow = (SResultRow*)outBuf;
|
||||||
|
size_t processedSize = 0;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (inBuf == NULL) {
|
||||||
|
qError("invalid input buffer, inBuf:%p", inBuf);
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculate the size of output buffer
|
||||||
|
*outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
|
||||||
|
*outBuf = taosMemoryMalloc(*outBufSize);
|
||||||
|
if (*outBuf == NULL) {
|
||||||
|
qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
(void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
|
||||||
|
inBuf += sizeof(SResultRow);
|
||||||
|
processedSize += sizeof(SResultRow);
|
||||||
|
for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
|
||||||
|
int32_t len = *(int32_t*)inBuf;
|
||||||
|
inBuf += sizeof(int32_t);
|
||||||
|
processedSize += sizeof(int32_t);
|
||||||
|
if (pCtx->fpSet.decode) {
|
||||||
|
code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("failed to decode result row, code:%d", code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
(void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len);
|
||||||
|
}
|
||||||
|
inBuf += len;
|
||||||
|
processedSize += len;
|
||||||
|
}
|
||||||
|
void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) +
|
||||||
|
sizeof(SResultRowEntryInfo) +
|
||||||
|
pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize;
|
||||||
|
(void)memcpy(pos, inBuf, pSup->numOfExprs * sizeof(bool));
|
||||||
|
inBuf += pSup->numOfExprs * sizeof(bool);
|
||||||
|
processedSize += pSup->numOfExprs * sizeof(bool);
|
||||||
|
|
||||||
|
if (processedSize < inBufSize) {
|
||||||
|
// stream stores extra data after result row
|
||||||
|
size_t leftLen = inBufSize - processedSize;
|
||||||
|
TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen);
|
||||||
|
if (*outBuf == NULL) {
|
||||||
|
qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
(void)memcpy(outBuf + processedSize, inBuf, leftLen);
|
||||||
|
inBuf += leftLen;
|
||||||
|
processedSize += leftLen;
|
||||||
|
*outBufSize += leftLen;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert result row to buf for rocksdb
|
||||||
|
int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) {
|
||||||
|
SqlFunctionCtx *pCtx = pSup->pCtx;
|
||||||
|
int32_t *offset = pSup->rowEntryInfoOffset;
|
||||||
|
SResultRow *pResultRow = (SResultRow*)inBuf;
|
||||||
|
size_t rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
|
||||||
|
|
||||||
|
if (inBuf == NULL) {
|
||||||
|
qError("invalid input buffer, inBuf:%p", inBuf);
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
if (rowSize > inBufSize) {
|
||||||
|
qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculate the size of output buffer
|
||||||
|
*outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs;
|
||||||
|
if (rowSize < inBufSize) {
|
||||||
|
*outBufSize += inBufSize - rowSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
*outBuf = taosMemoryMalloc(*outBufSize);
|
||||||
|
if (*outBuf == NULL) {
|
||||||
|
qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
|
||||||
|
(void)memcpy(outBuf, pResultRow, sizeof(SResultRow));
|
||||||
|
outBuf += sizeof(SResultRow);
|
||||||
|
for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
|
||||||
|
*(int32_t *) outBuf = offset[i];
|
||||||
|
outBuf += sizeof(int32_t);
|
||||||
|
size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
|
||||||
|
(void)memcpy(outBuf, getResultEntryInfo(pResultRow, i, offset), len);
|
||||||
|
outBuf += len;
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark if col is null for top/bottom result(saveTupleData)
|
||||||
|
void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) +
|
||||||
|
sizeof(SResultRowEntryInfo) +
|
||||||
|
pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize;
|
||||||
|
|
||||||
|
(void)memcpy(outBuf, pos, pSup->numOfExprs * sizeof(bool));
|
||||||
|
|
||||||
|
if (rowSize < inBufSize) {
|
||||||
|
// stream stores extra data after result row
|
||||||
|
size_t leftLen = inBufSize - rowSize;
|
||||||
|
(void)memcpy(outBuf, inBuf + rowSize, leftLen);
|
||||||
|
outBuf += leftLen;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
|
static void freeEx(void* p) { taosMemoryFree(*(void**)p); }
|
||||||
|
|
||||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
||||||
|
|
|
@ -2232,6 +2232,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
||||||
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
||||||
QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno);
|
||||||
|
|
||||||
|
// used for backward compatibility of function's result info
|
||||||
|
pSup->pState->pResultRowStore->resultRowGet = getResultRowFromBuf;
|
||||||
|
pSup->pState->pResultRowStore->resultRowPut = putResultRowToBuf;
|
||||||
|
pSup->pState->pExprSupp = pExpSup;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue