From 11ed8a1540fe444531d50ad920a1b49e919e894b Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Wed, 25 Sep 2024 16:14:47 +0800 Subject: [PATCH 1/7] fix:[TD-32184] Support backwards compatibility for function's result info. --- include/libs/function/function.h | 12 ++ source/libs/executor/inc/executil.h | 4 + source/libs/executor/src/executil.c | 116 ++++++++++++++++++ .../executor/src/streamtimewindowoperator.c | 5 + 4 files changed, 137 insertions(+) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 7ca046762a..c66d74a905 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -29,6 +29,7 @@ struct SqlFunctionCtx; struct SResultRowEntryInfo; struct SFunctionNode; +struct SExprSupp; typedef struct SScalarParam SScalarParam; typedef struct SStreamState SStreamState; @@ -43,6 +44,7 @@ typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx); +typedef int32_t (*FExecDecode)(struct SqlFunctionCtx *pCtx, const char *buf, struct SResultRowEntryInfo *pResultCellInfo, int32_t version); typedef int32_t (*processFuncByRow)(SArray* pCtx); // array of SqlFunctionCtx typedef struct SScalarFuncExecFuncs { @@ -57,6 +59,7 @@ typedef struct SFuncExecFuncs { FExecFinalize finalize; FExecCombine combine; FExecCleanUp cleanup; + FExecDecode decode; processFuncByRow processFuncByRow; } SFuncExecFuncs; @@ -65,6 +68,8 @@ typedef struct SFuncExecFuncs { #define TOP_BOTTOM_QUERY_LIMIT 100 #define FUNCTIONS_NAME_MAX_LENGTH 32 +#define FUNCTION_RESULT_INFO_VERSION 1 + typedef struct SResultRowEntryInfo { bool initialized : 1; // output buffer has been initialized bool complete : 1; // query has completed @@ -165,6 +170,11 @@ typedef struct STdbState { void *txn; } STdbState; +typedef struct SResultRowStore { + int32_t (*resultRowPut)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); + int32_t (*resultRowGet)(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); +} SResultRowStore; + struct SStreamState { STdbState *pTdbState; struct SStreamFileState *pFileState; @@ -175,6 +185,8 @@ struct SStreamState { int64_t streamBackendRid; int8_t dump; int32_t tsIndex; + SResultRowStore *pResultRowStore; + struct SExprSupp *pExprSupp; }; typedef struct SFunctionStateStore { diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 95035dd96f..9e36a29476 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -48,6 +48,7 @@ typedef struct SGroupResInfo { } SGroupResInfo; typedef struct SResultRow { + int32_t version; int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer int32_t offset : 29; // row index in buffer page bool startInterp; // the time window start timestamp has done the interpolation already. @@ -152,6 +153,9 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo return pRow; } +int32_t getResultRowFromBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); +int32_t putResultRowToBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); + int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 4fe45ff72e..141d64bfd1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -93,6 +93,122 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { return rowSize; } +// Convert buf read from rocksdb to result row +int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { + SqlFunctionCtx *pCtx = pSup->pCtx; + int32_t *offset = pSup->rowEntryInfoOffset; + SResultRow *pResultRow = (SResultRow*)outBuf; + size_t processedSize = 0; + int32_t code = TSDB_CODE_SUCCESS; + if (inBuf == NULL) { + qError("invalid input buffer, inBuf:%p", inBuf); + return TSDB_CODE_INVALID_PARA; + } + + // calculate the size of output buffer + *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs); + *outBuf = taosMemoryMalloc(*outBufSize); + if (*outBuf == NULL) { + qError("failed to allocate memory for output buffer, size:%zu", *outBufSize); + return terrno; + } + (void)memcpy(pResultRow, inBuf, sizeof(SResultRow)); + inBuf += sizeof(SResultRow); + processedSize += sizeof(SResultRow); + for (int32_t i = 0; i < pSup->numOfExprs; ++i) { + int32_t len = *(int32_t*)inBuf; + inBuf += sizeof(int32_t); + processedSize += sizeof(int32_t); + if (pCtx->fpSet.decode) { + code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to decode result row, code:%d", code); + return code; + } + } else { + (void)memcpy(getResultEntryInfo(pResultRow, i, offset), inBuf, len); + } + inBuf += len; + processedSize += len; + } + void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) + + sizeof(SResultRowEntryInfo) + + pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; + (void)memcpy(pos, inBuf, pSup->numOfExprs * sizeof(bool)); + inBuf += pSup->numOfExprs * sizeof(bool); + processedSize += pSup->numOfExprs * sizeof(bool); + + if (processedSize < inBufSize) { + // stream stores extra data after result row + size_t leftLen = inBufSize - processedSize; + TAOS_MEMORY_REALLOC(*outBuf, *outBufSize + leftLen); + if (*outBuf == NULL) { + qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen); + return terrno; + } + (void)memcpy(outBuf + processedSize, inBuf, leftLen); + inBuf += leftLen; + processedSize += leftLen; + *outBufSize += leftLen; + } + return TSDB_CODE_SUCCESS; +} + +// Convert result row to buf for rocksdb +int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { + SqlFunctionCtx *pCtx = pSup->pCtx; + int32_t *offset = pSup->rowEntryInfoOffset; + SResultRow *pResultRow = (SResultRow*)inBuf; + size_t rowSize = getResultRowSize(pCtx, pSup->numOfExprs); + + if (inBuf == NULL) { + qError("invalid input buffer, inBuf:%p", inBuf); + return TSDB_CODE_INVALID_PARA; + } + if (rowSize > inBufSize) { + qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize); + return TSDB_CODE_INVALID_PARA; + } + + // calculate the size of output buffer + *outBufSize = rowSize + sizeof(int32_t) * pSup->numOfExprs; + if (rowSize < inBufSize) { + *outBufSize += inBufSize - rowSize; + } + + *outBuf = taosMemoryMalloc(*outBufSize); + if (*outBuf == NULL) { + qError("failed to allocate memory for output buffer, size:%zu", *outBufSize); + return terrno; + } + + pResultRow->version = FUNCTION_RESULT_INFO_VERSION; + (void)memcpy(outBuf, pResultRow, sizeof(SResultRow)); + outBuf += sizeof(SResultRow); + for (int32_t i = 0; i < pSup->numOfExprs; ++i) { + *(int32_t *) outBuf = offset[i]; + outBuf += sizeof(int32_t); + size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize; + (void)memcpy(outBuf, getResultEntryInfo(pResultRow, i, offset), len); + outBuf += len; + } + + // mark if col is null for top/bottom result(saveTupleData) + void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) + + sizeof(SResultRowEntryInfo) + + pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; + + (void)memcpy(outBuf, pos, pSup->numOfExprs * sizeof(bool)); + + if (rowSize < inBufSize) { + // stream stores extra data after result row + size_t leftLen = inBufSize - rowSize; + (void)memcpy(outBuf, inBuf + rowSize, leftLen); + outBuf += leftLen; + } + return TSDB_CODE_SUCCESS; +} + static void freeEx(void* p) { taosMemoryFree(*(void**)p); } void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index be27f277c0..cf539cb4cb 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2232,6 +2232,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->pResultRows = tSimpleHashInit(32, hashFn); QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno); + // used for backward compatibility of function's result info + pSup->pState->pResultRowStore->resultRowGet = getResultRowFromBuf; + pSup->pState->pResultRowStore->resultRowPut = putResultRowToBuf; + pSup->pState->pExprSupp = pExpSup; + for (int32_t i = 0; i < numOfOutput; ++i) { pExpSup->pCtx[i].saveHandle.pState = pSup->pState; } From ecfa67510c12983056116ef902b60da56ff80641 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 8 Oct 2024 19:44:56 +0800 Subject: [PATCH 2/7] add serial --- source/libs/stream/src/streamBackendRocksdb.c | 152 +++++++++++++++--- source/libs/stream/src/tstreamFileState.c | 2 +- 2 files changed, 132 insertions(+), 22 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c88971ab75..4326ac250a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3289,13 +3289,31 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen); + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, size); + + taosMemoryFree(dst); return code; } int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen); + + char* tVal; + size_t tValLen = 0; + STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + taosMemoryFree(tVal); return code; } int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { @@ -3541,14 +3559,31 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* // func cf int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen); + int code = 0; + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, size); + taosMemoryFree(dst); + return code; } int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen); - return 0; + int code = 0; + char* tVal = NULL; + size_t tValLen = 0; + STREAM_STATE_GET_ROCKSDB(pState, "func", key, tVal, &tValLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + + taosMemoryFree(tVal); + return code; } int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { int code = 0; @@ -3563,7 +3598,15 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k if (value == NULL || vLen == 0) { stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen); } - STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen); + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, size); + taosMemoryFree(dst); + return code; } int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { @@ -3861,13 +3904,30 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen); + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, dst, size); + + taosMemoryFree(dst); return code; } int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); + + char* tVal; + size_t tValLen = 0; + STREAM_STATE_GET_ROCKSDB(pState, "fill", key, &tVal, &tValLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + taosMemoryFree(tVal); return code; } int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { @@ -4204,21 +4264,44 @@ _end: #ifdef BUILD_NO_CALL // partag cf int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen); + int code = 0; + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, dst, size); + taosMemoryFree(dst); return code; } int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen); + int code = 0; + char* tVal; + size_t tValLen = 0; + STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, &tVal, &tValLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen); + taosMemoryFree(tVal); + return code; } #endif // parname cfg int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN); + int code = 0; + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, tbname, TSDB_TABLE_NAME_LEN, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)dst, size); + taosMemoryFree(dst); return code; } int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) { @@ -4229,13 +4312,30 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi } int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); + int code = 0; + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, pVal, pVLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "default", key, dst, size); + taosMemoryFree(dst); return code; } int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { - int code = 0; + int code = 0; + char* tVal; + size_t tValLen = 0; STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + taosMemoryFree(tVal); + return code; } int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { @@ -4377,10 +4477,18 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl, void* tmpBuf) { - char buf[128] = {0}; + char buf[128] = {0}; + + char* dst = NULL; + size_t size = 0; + int32_t code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); + if (code != 0) { + return code; + } + int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf); char* ttlV = tmpBuf; - int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); + int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; @@ -4389,6 +4497,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); + taosMemoryFree(dst); + if (tmpBuf == NULL) { taosMemoryFree(ttlV); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index cf5f1b2b91..7237f23671 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -698,7 +698,7 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName); - int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64; + int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2; char* buf = taosMemoryCalloc(1, len); if (!buf) { code = terrno; From 06121e6c9d9c13b28fdfb86dc89d71043096c9f8 Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Thu, 10 Oct 2024 09:04:44 +0800 Subject: [PATCH 3/7] fix:[TD-32184] fix compile error. --- include/libs/function/function.h | 2 +- source/libs/executor/src/executil.c | 24 ++++--- .../executor/src/streamtimewindowoperator.c | 9 ++- source/libs/stream/src/streamBackendRocksdb.c | 70 ++++--------------- source/libs/stream/src/streamState.c | 3 + 5 files changed, 39 insertions(+), 69 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index c66d74a905..51d9e752a4 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -185,7 +185,7 @@ struct SStreamState { int64_t streamBackendRid; int8_t dump; int32_t tsIndex; - SResultRowStore *pResultRowStore; + SResultRowStore pResultRowStore; struct SExprSupp *pExprSupp; }; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 141d64bfd1..94b89ab64c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -97,7 +97,7 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { SqlFunctionCtx *pCtx = pSup->pCtx; int32_t *offset = pSup->rowEntryInfoOffset; - SResultRow *pResultRow = (SResultRow*)outBuf; + SResultRow *pResultRow = NULL; size_t processedSize = 0; int32_t code = TSDB_CODE_SUCCESS; if (inBuf == NULL) { @@ -112,6 +112,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize qError("failed to allocate memory for output buffer, size:%zu", *outBufSize); return terrno; } + pResultRow = (SResultRow*)*outBuf; (void)memcpy(pResultRow, inBuf, sizeof(SResultRow)); inBuf += sizeof(SResultRow); processedSize += sizeof(SResultRow); @@ -146,7 +147,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen); return terrno; } - (void)memcpy(outBuf + processedSize, inBuf, leftLen); + (void)memcpy(*outBuf + processedSize, inBuf, leftLen); inBuf += leftLen; processedSize += leftLen; *outBufSize += leftLen; @@ -182,15 +183,16 @@ int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, return terrno; } + char *pBuf = *outBuf; pResultRow->version = FUNCTION_RESULT_INFO_VERSION; - (void)memcpy(outBuf, pResultRow, sizeof(SResultRow)); - outBuf += sizeof(SResultRow); + (void)memcpy(pBuf, pResultRow, sizeof(SResultRow)); + pBuf += sizeof(SResultRow); for (int32_t i = 0; i < pSup->numOfExprs; ++i) { - *(int32_t *) outBuf = offset[i]; - outBuf += sizeof(int32_t); size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize; - (void)memcpy(outBuf, getResultEntryInfo(pResultRow, i, offset), len); - outBuf += len; + *(int32_t *) pBuf = (int32_t)len; + pBuf += sizeof(int32_t); + (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len); + pBuf += len; } // mark if col is null for top/bottom result(saveTupleData) @@ -198,13 +200,13 @@ int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, sizeof(SResultRowEntryInfo) + pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; - (void)memcpy(outBuf, pos, pSup->numOfExprs * sizeof(bool)); + (void)memcpy(pBuf, pos, pSup->numOfExprs * sizeof(bool)); if (rowSize < inBufSize) { // stream stores extra data after result row size_t leftLen = inBufSize - rowSize; - (void)memcpy(outBuf, inBuf + rowSize, leftLen); - outBuf += leftLen; + (void)memcpy(pBuf, inBuf + rowSize, leftLen); + pBuf += leftLen; } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index cf539cb4cb..fcd58fbe56 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2233,8 +2233,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno); // used for backward compatibility of function's result info - pSup->pState->pResultRowStore->resultRowGet = getResultRowFromBuf; - pSup->pState->pResultRowStore->resultRowPut = putResultRowToBuf; + pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; + pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf; pSup->pState->pExprSupp = pExpSup; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -5396,6 +5396,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* &pInfo->pState->pFileState); QUERY_CHECK_CODE(code, lino, _error); + // used for backward compatibility of function's result info + pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; + pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf; + pInfo->pState->pExprSupp = &pOperator->exprSupp; + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4326ac250a..8a487ffeae 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3291,11 +3291,11 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v SStateKey sKey = {.key = *key, .opNum = pState->number}; char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; } - STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, size); + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size); taosMemoryFree(dst); return code; @@ -3312,7 +3312,7 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** return code; } - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); taosMemoryFree(tVal); return code; } @@ -3562,11 +3562,11 @@ int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, c int code = 0; char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; } - STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, size); + STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, (int32_t)size); taosMemoryFree(dst); return code; @@ -3580,7 +3580,7 @@ int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, v taosMemoryFree(tVal); return code; } - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); taosMemoryFree(tVal); return code; @@ -3600,11 +3600,11 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k } char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; } - STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, size); + STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, (int32_t)size); taosMemoryFree(dst); return code; @@ -3904,30 +3904,14 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int code = 0; - char* dst = NULL; - size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); - if (code != 0) { - return code; - } - STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, dst, size); - - taosMemoryFree(dst); + STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen); return code; } int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; - char* tVal; - size_t tValLen = 0; - STREAM_STATE_GET_ROCKSDB(pState, "fill", key, &tVal, &tValLen); - if (code != 0) { - taosMemoryFree(tVal); - return code; - } - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); - taosMemoryFree(tVal); + STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); return code; } int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { @@ -4267,7 +4251,7 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, cons int code = 0; char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; } @@ -4285,7 +4269,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void taosMemoryFree(tVal); return code; } - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen); + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen); taosMemoryFree(tVal); return code; @@ -4294,14 +4278,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void // parname cfg int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int code = 0; - char* dst = NULL; - size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, tbname, TSDB_TABLE_NAME_LEN, &dst, &size); - if (code != 0) { - return code; - } - STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)dst, size); - taosMemoryFree(dst); + STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN); return code; } int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) { @@ -4313,29 +4290,12 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { int code = 0; - char* dst = NULL; - size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, pVal, pVLen, &dst, &size); - if (code != 0) { - return code; - } - STREAM_STATE_PUT_ROCKSDB(pState, "default", key, dst, size); - taosMemoryFree(dst); + STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { int code = 0; - char* tVal; - size_t tValLen = 0; STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); - if (code != 0) { - taosMemoryFree(tVal); - return code; - } - - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); - taosMemoryFree(tVal); - return code; } int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { @@ -4481,7 +4441,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb char* dst = NULL; size_t size = 0; - int32_t code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); + int32_t code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); if (code != 0) { return code; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1994c882aa..4c83f1b109 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -528,6 +528,9 @@ void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { } dst->dump = 1; dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend; + dst->pResultRowStore.resultRowPut = src->pResultRowStore.resultRowPut; + dst->pResultRowStore.resultRowGet = src->pResultRowStore.resultRowGet; + dst->pExprSupp = src->pExprSupp; return; } SStreamStateCur* createStreamStateCursor() { From 5aeb1ec2ad75de2e08ff6042b61d78c238464fd1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Oct 2024 21:10:15 +0800 Subject: [PATCH 4/7] add interface --- source/libs/stream/inc/streamBackendRocksdb.h | 17 +-- source/libs/stream/src/streamBackendRocksdb.c | 102 +++++++++++++----- source/libs/stream/src/streamSessionState.c | 24 +++-- source/libs/stream/src/tstreamFileState.c | 2 +- 4 files changed, 96 insertions(+), 49 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 567d9de949..c4cf6a47cd 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -80,7 +80,7 @@ typedef struct { TdThreadRwlock chkpDirLock; int64_t dataWritten; - void* pMeta; + void* pMeta; int8_t removeAllFiles; } STaskDbWrapper; @@ -153,7 +153,7 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void* taskDbAddRef(void* pTaskDb); void taskDbRemoveRef(void* pTaskDb); -void taskDbSetClearFileFlag(void* pTaskDb); +void taskDbSetClearFileFlag(void* pTaskDb); int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); @@ -191,7 +191,8 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, int64_t groupId); int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur); -int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); +int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey, + void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); @@ -255,11 +256,11 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId); -int32_t bkdMgtCreate(char* path, SBkdMgt **bm); -int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); -int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name); -int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); -void bkdMgtDestroy(SBkdMgt* bm); +int32_t bkdMgtCreate(char* path, SBkdMgt** bm); +int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); +int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name); +int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); +void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* id); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8a487ffeae..d469580d04 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3291,13 +3291,16 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v SStateKey sKey = {.key = *key, .opNum = pState->number}; char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); - if (code != 0) { - return code; + if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, (int32_t)vLen); + } else { + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size); + taosMemoryFree(dst); } - STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size); - - taosMemoryFree(dst); return code; } int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { @@ -3311,7 +3314,11 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** taosMemoryFree(tVal); return code; } - + if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) { + *pVal = tVal; + *pVLen = tValLen; + return code; + } code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); taosMemoryFree(tVal); return code; @@ -3580,6 +3587,13 @@ int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, v taosMemoryFree(tVal); return code; } + + if (pState->pResultRowStore.resultRowGet == NULL || pState->pExprSupp == NULL) { + *pVal = tVal; + *pVLen = tValLen; + return code; + } + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); taosMemoryFree(tVal); @@ -3600,6 +3614,11 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k } char* dst = NULL; size_t size = 0; + if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { + STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, (void*)value, (int32_t)vLen); + return code; + } + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; @@ -3617,7 +3636,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo void* tmp = NULL; int32_t vLen = 0; - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, &tmp, &vLen); if (code == 0 && key->win.skey == resKey.win.skey) { *key = resKey; @@ -3856,7 +3875,8 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con return pCur; } -int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { +int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SSessionKey* pKey, + void** pVal, int32_t* pVLen) { if (!pCur) { return -1; } @@ -3890,13 +3910,27 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* return -1; } + char* tVal = val; + size_t tVlen = len; + if (pVal != NULL) { - *pVal = (char*)val; + if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) { + int code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen); + if (code != 0) { + taosMemoryFree(val); + return code; + } + taosMemoryFree(val); + *pVal = (char*)tVal; + } else { + *pVal = (char*)tVal; + } } else { taosMemoryFree(val); } - if (pVLen != NULL) *pVLen = len; + if (pVLen != NULL) *pVLen = tVlen; + *pKey = pKTmp->key; return 0; } @@ -4085,7 +4119,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey)); SSessionKey resKey = *key; - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -4094,7 +4128,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes if (c > 0) { streamStateCurNext_rocksdb(pCur); - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -4102,7 +4136,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes } } else if (c < 0) { streamStateCurPrev(pState, pCur); - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &resKey, NULL, NULL); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -4132,7 +4166,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen); if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { @@ -4149,7 +4183,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key); } - code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen); if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, *pVLen); @@ -4176,7 +4210,7 @@ void streamStateSessionClear_rocksdb(SStreamState* pState) { SSessionKey delKey = {0}; void* buf = NULL; int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, &delKey, &buf, &size); if (code == 0 && size > 0) { memset(buf, 0, size); // refactor later @@ -4204,7 +4238,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen); if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { memcpy(tmp, *pVal, valSize); @@ -4224,7 +4258,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key); } taosMemoryFreeClear(*pVal); - code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen); if (code == 0) { void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { @@ -4251,6 +4285,10 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, cons int code = 0; char* dst = NULL; size_t size = 0; + if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { + STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen); + return code; + } code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; @@ -4277,7 +4315,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void #endif // parname cfg int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - int code = 0; + int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN); return code; } @@ -4289,12 +4327,12 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi } int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { - int code = 0; + int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { - int code = 0; + int code = 0; STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } @@ -4437,15 +4475,21 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl, void* tmpBuf) { - char buf[128] = {0}; + int32_t code = 0; + char buf[128] = {0}; - char* dst = NULL; - size_t size = 0; - int32_t code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); - if (code != 0) { - return code; + char* dst = NULL; + size_t size = 0; + if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { + dst = val; + size = vlen; + return -1; + } else { + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); + if (code != 0) { + return code; + } } - int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf); char* ttlV = tmpBuf; int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 7e3d8d59f9..bb8ea6c03c 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -284,7 +284,7 @@ _end: int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen, int32_t* pWinCode) { - SWinKey* pTmpkey = pKey; + SWinKey* pTmpkey = pKey; SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts}; return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode); } @@ -343,7 +343,8 @@ _end: return code; } -int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { +int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen, + int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); @@ -353,7 +354,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v } pNewPos->needFree = true; pNewPos->beFlushed = true; - void* pBuff = NULL; + void* pBuff = NULL; (*pWinCode) = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); if ((*pWinCode) != TSDB_CODE_SUCCESS) { goto _end; @@ -575,7 +576,7 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) { SSessionKey key = {.groupId = groupId}; - int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL); + int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, *ppCur, &key, NULL, NULL); if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { if (!(*ppCur)) { @@ -653,7 +654,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS SSessionKey key = {0}; void* pVal = NULL; int len = 0; - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len); + int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len); if (code == TSDB_CODE_FAILED) { streamStateFreeCur(pCur); return pBuffCur; @@ -667,7 +668,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS } streamStateCurPrev(pFileStore, pCur); while (1) { - code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len); + code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len); if (code == TSDB_CODE_FAILED) { streamStateCurNext(pFileStore, pCur); return pCur; @@ -710,7 +711,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void *pKey = *(SSessionKey*)(pPos->pKey); } else { void* pData = NULL; - code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, pKey, &pData, pVLen); if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) { transformCursor(pCur->pStreamFileState, pCur); @@ -915,7 +916,7 @@ _end: int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey); - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen); streamStateFreeCur(pCur); if (code == TSDB_CODE_SUCCESS) { return code; @@ -923,7 +924,7 @@ int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey); } - code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, pKey, pVal, pVLen); streamStateFreeCur(pCur); return code; } @@ -1060,7 +1061,8 @@ _end: return code; } -int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) { +int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, + int32_t* pVLen) { SSessionKey* pWinKey = pKey; const TSKEY gap = 0; int32_t code = TSDB_CODE_SUCCESS; @@ -1098,7 +1100,7 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey QUERY_CHECK_CODE(code, lino, _end); } qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, - pWinKey->win.ekey, code_file); + pWinKey->win.ekey, code_file); } } else { code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 7237f23671..6a102743cd 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -849,7 +849,7 @@ int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { void* pVal = NULL; int32_t vlen = 0; SSessionKey key = {0}; - winRes = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen); + winRes = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &vlen); if (winRes != TSDB_CODE_SUCCESS) { break; } From 193220aa88c695d89b43e8bb93ef6dfdd66d4c8a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Oct 2024 21:28:13 +0800 Subject: [PATCH 5/7] add interface --- source/libs/stream/src/streamBackendRocksdb.c | 6 +++++- source/libs/stream/test/backendTest.cpp | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d469580d04..f91c26638a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3307,7 +3307,7 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - char* tVal; + char* tVal = NULL; size_t tValLen = 0; STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen); if (code != 0) { @@ -3569,6 +3569,10 @@ int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, c int code = 0; char* dst = NULL; size_t size = 0; + if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { + STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, (int32_t)vLen); + return code; + } code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index e7e7149882..1518d22fe9 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -228,17 +228,17 @@ void *backendOpen() { memset(&key, 0, sizeof(key)); char *val = NULL; int32_t vlen = 0; - code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); + code = streamStateSessionGetKVByCur_rocksdb(NULL, pCurr, &key, (void **)&val, &vlen); ASSERT(code == 0); pCurr = streamStateSessionSeekKeyPrev_rocksdb(p, &key); - code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); + code = streamStateSessionGetKVByCur_rocksdb(NULL, pCurr, &key, (void **)&val, &vlen); ASSERT(code == 0); ASSERT(key.groupId == 0 && key.win.ekey == tsArray[tsArray.size() - 2]); pCurr = streamStateSessionSeekKeyNext_rocksdb(p, &key); - code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); + code = streamStateSessionGetKVByCur_rocksdb(NULL, pCurr, &key, (void **)&val, &vlen); ASSERT(code == 0); ASSERT(vlen == strlen("Value")); ASSERT(key.groupId == 0 && key.win.skey == tsArray[tsArray.size() - 1]); From cf3db4f1a6b911710240d6a59c5790a8a1cc35ae Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Fri, 11 Oct 2024 08:43:05 +0800 Subject: [PATCH 6/7] fix:[TD-32184] fix heap use after free. --- source/libs/executor/src/executil.c | 35 +++++--------- .../executor/src/streamtimewindowoperator.c | 28 +++++++---- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 47 ++++++++++++++++--- source/libs/stream/src/streamSessionState.c | 6 +-- source/libs/stream/src/streamState.c | 2 +- source/libs/stream/src/tstreamFileState.c | 5 +- 7 files changed, 77 insertions(+), 48 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 94b89ab64c..fd67468731 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -88,22 +88,20 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { rowSize += pCtx[i].resDataInfo.interBufSize; } - rowSize += (numOfOutput * sizeof(bool)); - // expand rowSize to mark if col is null for top/bottom result(saveTupleData) return rowSize; } // Convert buf read from rocksdb to result row int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { + if (inBuf == NULL || pSup == NULL) { + qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup); + return TSDB_CODE_INVALID_PARA; + } SqlFunctionCtx *pCtx = pSup->pCtx; int32_t *offset = pSup->rowEntryInfoOffset; SResultRow *pResultRow = NULL; size_t processedSize = 0; int32_t code = TSDB_CODE_SUCCESS; - if (inBuf == NULL) { - qError("invalid input buffer, inBuf:%p", inBuf); - return TSDB_CODE_INVALID_PARA; - } // calculate the size of output buffer *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs); @@ -116,6 +114,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize (void)memcpy(pResultRow, inBuf, sizeof(SResultRow)); inBuf += sizeof(SResultRow); processedSize += sizeof(SResultRow); + for (int32_t i = 0; i < pSup->numOfExprs; ++i) { int32_t len = *(int32_t*)inBuf; inBuf += sizeof(int32_t); @@ -132,12 +131,6 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize inBuf += len; processedSize += len; } - void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) + - sizeof(SResultRowEntryInfo) + - pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; - (void)memcpy(pos, inBuf, pSup->numOfExprs * sizeof(bool)); - inBuf += pSup->numOfExprs * sizeof(bool); - processedSize += pSup->numOfExprs * sizeof(bool); if (processedSize < inBufSize) { // stream stores extra data after result row @@ -147,7 +140,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen); return terrno; } - (void)memcpy(*outBuf + processedSize, inBuf, leftLen); + (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen); inBuf += leftLen; processedSize += leftLen; *outBufSize += leftLen; @@ -157,15 +150,16 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize // Convert result row to buf for rocksdb int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { + if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) { + qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf); + return TSDB_CODE_INVALID_PARA; + } + SqlFunctionCtx *pCtx = pSup->pCtx; int32_t *offset = pSup->rowEntryInfoOffset; SResultRow *pResultRow = (SResultRow*)inBuf; size_t rowSize = getResultRowSize(pCtx, pSup->numOfExprs); - if (inBuf == NULL) { - qError("invalid input buffer, inBuf:%p", inBuf); - return TSDB_CODE_INVALID_PARA; - } if (rowSize > inBufSize) { qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize); return TSDB_CODE_INVALID_PARA; @@ -195,13 +189,6 @@ int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, pBuf += len; } - // mark if col is null for top/bottom result(saveTupleData) - void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) + - sizeof(SResultRowEntryInfo) + - pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; - - (void)memcpy(pBuf, pos, pSup->numOfExprs * sizeof(bool)); - if (rowSize < inBufSize) { // stream stores extra data after result row size_t leftLen = inBufSize - rowSize; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index fcd58fbe56..fc919dfe5f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2006,6 +2006,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->stateStore = pTaskInfo->storageAPI.stateStore; int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = NULL; + + // used for backward compatibility of function's result info + pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; + pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf; + pInfo->pState->pExprSupp = &pOperator->exprSupp; + code = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), @@ -2223,6 +2229,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); pSup->pState->pFileState = NULL; + + // used for backward compatibility of function's result info + pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; + pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf; + pSup->pState->pExprSupp = pExpSup; + code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState); @@ -2232,11 +2244,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->pResultRows = tSimpleHashInit(32, hashFn); QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno); - // used for backward compatibility of function's result info - pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; - pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf; - pSup->pState->pExprSupp = pExpSup; - for (int32_t i = 0; i < numOfOutput; ++i) { pExpSup->pCtx[i].saveHandle.pState = pSup->pState; } @@ -5390,17 +5397,18 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->pState->pFileState = NULL; - code = pTaskInfo->storageAPI.stateStore.streamFileStateInit( - tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, - &pInfo->pState->pFileState); - QUERY_CHECK_CODE(code, lino, _error); // used for backward compatibility of function's result info pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf; pInfo->pState->pExprSupp = &pOperator->exprSupp; + code = pTaskInfo->storageAPI.stateStore.streamFileStateInit( + tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, + &pInfo->pState->pFileState); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index c4cf6a47cd..1e0801fb6b 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -169,7 +169,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); void streamStateCurPrev_rocksdb(SStreamStateCur* pCur); -int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f91c26638a..c0bde10774 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3319,7 +3319,9 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** *pVLen = tValLen; return code; } - code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + size_t pValLen = 0; + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen); + *pVLen = (int32_t)pValLen; taosMemoryFree(tVal); return code; } @@ -3376,7 +3378,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { } SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); - code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + code = streamStateGetKVByCur_rocksdb(pState, pCur, key, NULL, 0); if (code != 0) { return code; } @@ -3420,7 +3422,8 @@ void streamStateCurPrev_rocksdb(SStreamStateCur* pCur) { rocksdb_iter_prev(pCur->iter); } } -int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { +int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, + int32_t* pVLen) { if (!pCur) return -1; SStateKey tkey; SStateKey* pKtmp = &tkey; @@ -3436,7 +3439,35 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons if (pVLen != NULL) { size_t vlen = 0; const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - *pVLen = valueDecode((void*)valStr, vlen, NULL, (char**)pVal); + char* val = NULL; + int32_t len = valueDecode((void*)valStr, vlen, NULL, (char**)val); + if (len <= 0) { + taosMemoryFree(val); + return -1; + } + + char* tVal = val; + size_t tVlen = len; + + if (pVal != NULL) { + if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) { + int code = + (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen); + if (code != 0) { + taosMemoryFree(val); + return code; + } + taosMemoryFree(val); + *pVal = (char*)tVal; + } else { + stInfo("streamStateGetKVByCur_rocksdb, pState = %p, pResultRowStore = %p, pExprSupp = %p", pState, + pState->pResultRowStore.resultRowGet, pState->pExprSupp); + *pVal = (char*)tVal; + } + } else { + taosMemoryFree(val); + } + *pVLen = (int32_t)tVlen; } *pKey = pKtmp->key; @@ -3598,7 +3629,9 @@ int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, v return code; } - code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + size_t pValLen = 0; + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen); + *pVLen = (int32_t)pValLen; taosMemoryFree(tVal); return code; @@ -3933,7 +3966,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateC taosMemoryFree(val); } - if (pVLen != NULL) *pVLen = tVlen; + if (pVLen != NULL) *pVLen = (int32_t)tVlen; *pKey = pKTmp->key; return 0; @@ -4450,6 +4483,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { + int32_t code = 0; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1)); @@ -4487,7 +4521,6 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { dst = val; size = vlen; - return -1; } else { code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); if (code != 0) { diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index bb8ea6c03c..536636533f 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -576,7 +576,7 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) { SSessionKey key = {.groupId = groupId}; - int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, *ppCur, &key, NULL, NULL); + int32_t code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL); if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { if (!(*ppCur)) { @@ -654,7 +654,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS SSessionKey key = {0}; void* pVal = NULL; int len = 0; - int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len); + int32_t code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len); if (code == TSDB_CODE_FAILED) { streamStateFreeCur(pCur); return pBuffCur; @@ -711,7 +711,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void *pKey = *(SSessionKey*)(pPos->pKey); } else { void* pData = NULL; - code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, pKey, &pData, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen); if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) { transformCursor(pCur->pStreamFileState, pCur); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 4c83f1b109..2e6a724912 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -302,7 +302,7 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) { } int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + return streamStateGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, pVal, pVLen); } int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 6a102743cd..424845e4f2 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -849,7 +849,7 @@ int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { void* pVal = NULL; int32_t vlen = 0; SSessionKey key = {0}; - winRes = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &vlen); + winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen); if (winRes != TSDB_CODE_SUCCESS) { break; } @@ -903,7 +903,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { QUERY_CHECK_CODE(code, lino, _end); } - winCode = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); + winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen); if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); @@ -912,6 +912,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { break; } if (vlen != pFileState->rowSize) { + qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen); code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; QUERY_CHECK_CODE(code, lino, _end); } From 9bd22e4e6162a308137c2bb373d298b332e7c7bf Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Mon, 14 Oct 2024 14:05:58 +0800 Subject: [PATCH 7/7] fix:[TD-32184] result row do decode only when version is different. --- source/libs/executor/src/executil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index fd67468731..a87f6f2789 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -119,7 +119,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize int32_t len = *(int32_t*)inBuf; inBuf += sizeof(int32_t); processedSize += sizeof(int32_t); - if (pCtx->fpSet.decode) { + if (pResultRow->version != FUNCTION_RESULT_INFO_VERSION && pCtx->fpSet.decode) { code = pCtx->fpSet.decode(&pCtx[i], inBuf, getResultEntryInfo(pResultRow, i, offset), pResultRow->version); if (code != TSDB_CODE_SUCCESS) { qError("failed to decode result row, code:%d", code);