From 06121e6c9d9c13b28fdfb86dc89d71043096c9f8 Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Thu, 10 Oct 2024 09:04:44 +0800 Subject: [PATCH] fix:[TD-32184] fix compile error. --- include/libs/function/function.h | 2 +- source/libs/executor/src/executil.c | 24 ++++--- .../executor/src/streamtimewindowoperator.c | 9 ++- source/libs/stream/src/streamBackendRocksdb.c | 70 ++++--------------- source/libs/stream/src/streamState.c | 3 + 5 files changed, 39 insertions(+), 69 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index c66d74a905..51d9e752a4 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -185,7 +185,7 @@ struct SStreamState { int64_t streamBackendRid; int8_t dump; int32_t tsIndex; - SResultRowStore *pResultRowStore; + SResultRowStore pResultRowStore; struct SExprSupp *pExprSupp; }; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 141d64bfd1..94b89ab64c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -97,7 +97,7 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { SqlFunctionCtx *pCtx = pSup->pCtx; int32_t *offset = pSup->rowEntryInfoOffset; - SResultRow *pResultRow = (SResultRow*)outBuf; + SResultRow *pResultRow = NULL; size_t processedSize = 0; int32_t code = TSDB_CODE_SUCCESS; if (inBuf == NULL) { @@ -112,6 +112,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize qError("failed to allocate memory for output buffer, size:%zu", *outBufSize); return terrno; } + pResultRow = (SResultRow*)*outBuf; (void)memcpy(pResultRow, inBuf, sizeof(SResultRow)); inBuf += sizeof(SResultRow); processedSize += sizeof(SResultRow); @@ -146,7 +147,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen); return terrno; } - (void)memcpy(outBuf + processedSize, inBuf, leftLen); + (void)memcpy(*outBuf + processedSize, inBuf, leftLen); inBuf += leftLen; processedSize += leftLen; *outBufSize += leftLen; @@ -182,15 +183,16 @@ int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, return terrno; } + char *pBuf = *outBuf; pResultRow->version = FUNCTION_RESULT_INFO_VERSION; - (void)memcpy(outBuf, pResultRow, sizeof(SResultRow)); - outBuf += sizeof(SResultRow); + (void)memcpy(pBuf, pResultRow, sizeof(SResultRow)); + pBuf += sizeof(SResultRow); for (int32_t i = 0; i < pSup->numOfExprs; ++i) { - *(int32_t *) outBuf = offset[i]; - outBuf += sizeof(int32_t); size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize; - (void)memcpy(outBuf, getResultEntryInfo(pResultRow, i, offset), len); - outBuf += len; + *(int32_t *) pBuf = (int32_t)len; + pBuf += sizeof(int32_t); + (void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len); + pBuf += len; } // mark if col is null for top/bottom result(saveTupleData) @@ -198,13 +200,13 @@ int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, sizeof(SResultRowEntryInfo) + pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; - (void)memcpy(outBuf, pos, pSup->numOfExprs * sizeof(bool)); + (void)memcpy(pBuf, pos, pSup->numOfExprs * sizeof(bool)); if (rowSize < inBufSize) { // stream stores extra data after result row size_t leftLen = inBufSize - rowSize; - (void)memcpy(outBuf, inBuf + rowSize, leftLen); - outBuf += leftLen; + (void)memcpy(pBuf, inBuf + rowSize, leftLen); + pBuf += leftLen; } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index cf539cb4cb..fcd58fbe56 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2233,8 +2233,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno); // used for backward compatibility of function's result info - pSup->pState->pResultRowStore->resultRowGet = getResultRowFromBuf; - pSup->pState->pResultRowStore->resultRowPut = putResultRowToBuf; + pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; + pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf; pSup->pState->pExprSupp = pExpSup; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -5396,6 +5396,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* &pInfo->pState->pFileState); QUERY_CHECK_CODE(code, lino, _error); + // used for backward compatibility of function's result info + pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; + pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf; + pInfo->pState->pExprSupp = &pOperator->exprSupp; + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4326ac250a..8a487ffeae 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3291,11 +3291,11 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v SStateKey sKey = {.key = *key, .opNum = pState->number}; char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; } - STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, size); + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size); taosMemoryFree(dst); return code; @@ -3312,7 +3312,7 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** return code; } - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); taosMemoryFree(tVal); return code; } @@ -3562,11 +3562,11 @@ int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, c int code = 0; char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; } - STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, size); + STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, (int32_t)size); taosMemoryFree(dst); return code; @@ -3580,7 +3580,7 @@ int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, v taosMemoryFree(tVal); return code; } - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); taosMemoryFree(tVal); return code; @@ -3600,11 +3600,11 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k } char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; } - STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, size); + STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, (int32_t)size); taosMemoryFree(dst); return code; @@ -3904,30 +3904,14 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int code = 0; - char* dst = NULL; - size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); - if (code != 0) { - return code; - } - STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, dst, size); - - taosMemoryFree(dst); + STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen); return code; } int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; - char* tVal; - size_t tValLen = 0; - STREAM_STATE_GET_ROCKSDB(pState, "fill", key, &tVal, &tValLen); - if (code != 0) { - taosMemoryFree(tVal); - return code; - } - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); - taosMemoryFree(tVal); + STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); return code; } int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { @@ -4267,7 +4251,7 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, cons int code = 0; char* dst = NULL; size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); if (code != 0) { return code; } @@ -4285,7 +4269,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void taosMemoryFree(tVal); return code; } - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen); + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen); taosMemoryFree(tVal); return code; @@ -4294,14 +4278,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void // parname cfg int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int code = 0; - char* dst = NULL; - size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, tbname, TSDB_TABLE_NAME_LEN, &dst, &size); - if (code != 0) { - return code; - } - STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)dst, size); - taosMemoryFree(dst); + STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN); return code; } int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) { @@ -4313,29 +4290,12 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { int code = 0; - char* dst = NULL; - size_t size = 0; - code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, pVal, pVLen, &dst, &size); - if (code != 0) { - return code; - } - STREAM_STATE_PUT_ROCKSDB(pState, "default", key, dst, size); - taosMemoryFree(dst); + STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { int code = 0; - char* tVal; - size_t tValLen = 0; STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); - if (code != 0) { - taosMemoryFree(tVal); - return code; - } - - code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); - taosMemoryFree(tVal); - return code; } int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { @@ -4481,7 +4441,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb char* dst = NULL; size_t size = 0; - int32_t code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); + int32_t code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); if (code != 0) { return code; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1994c882aa..4c83f1b109 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -528,6 +528,9 @@ void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { } dst->dump = 1; dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend; + dst->pResultRowStore.resultRowPut = src->pResultRowStore.resultRowPut; + dst->pResultRowStore.resultRowGet = src->pResultRowStore.resultRowGet; + dst->pExprSupp = src->pExprSupp; return; } SStreamStateCur* createStreamStateCursor() {