fix:[TD-32184] fix compile error.

This commit is contained in:
Jing Sima 2024-10-10 09:04:44 +08:00
parent ecfa67510c
commit 06121e6c9d
5 changed files with 39 additions and 69 deletions

View File

@ -185,7 +185,7 @@ struct SStreamState {
int64_t streamBackendRid; int64_t streamBackendRid;
int8_t dump; int8_t dump;
int32_t tsIndex; int32_t tsIndex;
SResultRowStore *pResultRowStore; SResultRowStore pResultRowStore;
struct SExprSupp *pExprSupp; struct SExprSupp *pExprSupp;
}; };

View File

@ -97,7 +97,7 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) {
SqlFunctionCtx *pCtx = pSup->pCtx; SqlFunctionCtx *pCtx = pSup->pCtx;
int32_t *offset = pSup->rowEntryInfoOffset; int32_t *offset = pSup->rowEntryInfoOffset;
SResultRow *pResultRow = (SResultRow*)outBuf; SResultRow *pResultRow = NULL;
size_t processedSize = 0; size_t processedSize = 0;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (inBuf == NULL) { if (inBuf == NULL) {
@ -112,6 +112,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize
qError("failed to allocate memory for output buffer, size:%zu", *outBufSize); qError("failed to allocate memory for output buffer, size:%zu", *outBufSize);
return terrno; return terrno;
} }
pResultRow = (SResultRow*)*outBuf;
(void)memcpy(pResultRow, inBuf, sizeof(SResultRow)); (void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
inBuf += sizeof(SResultRow); inBuf += sizeof(SResultRow);
processedSize += sizeof(SResultRow); processedSize += sizeof(SResultRow);
@ -146,7 +147,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize
qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen); qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
return terrno; return terrno;
} }
(void)memcpy(outBuf + processedSize, inBuf, leftLen); (void)memcpy(*outBuf + processedSize, inBuf, leftLen);
inBuf += leftLen; inBuf += leftLen;
processedSize += leftLen; processedSize += leftLen;
*outBufSize += leftLen; *outBufSize += leftLen;
@ -182,15 +183,16 @@ int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize,
return terrno; return terrno;
} }
char *pBuf = *outBuf;
pResultRow->version = FUNCTION_RESULT_INFO_VERSION; pResultRow->version = FUNCTION_RESULT_INFO_VERSION;
(void)memcpy(outBuf, pResultRow, sizeof(SResultRow)); (void)memcpy(pBuf, pResultRow, sizeof(SResultRow));
outBuf += sizeof(SResultRow); pBuf += sizeof(SResultRow);
for (int32_t i = 0; i < pSup->numOfExprs; ++i) { for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
*(int32_t *) outBuf = offset[i];
outBuf += sizeof(int32_t);
size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize; size_t len = sizeof(SResultRowEntryInfo) + pCtx[i].resDataInfo.interBufSize;
(void)memcpy(outBuf, getResultEntryInfo(pResultRow, i, offset), len); *(int32_t *) pBuf = (int32_t)len;
outBuf += len; pBuf += sizeof(int32_t);
(void)memcpy(pBuf, getResultEntryInfo(pResultRow, i, offset), len);
pBuf += len;
} }
// mark if col is null for top/bottom result(saveTupleData) // mark if col is null for top/bottom result(saveTupleData)
@ -198,13 +200,13 @@ int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize,
sizeof(SResultRowEntryInfo) + sizeof(SResultRowEntryInfo) +
pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize;
(void)memcpy(outBuf, pos, pSup->numOfExprs * sizeof(bool)); (void)memcpy(pBuf, pos, pSup->numOfExprs * sizeof(bool));
if (rowSize < inBufSize) { if (rowSize < inBufSize) {
// stream stores extra data after result row // stream stores extra data after result row
size_t leftLen = inBufSize - rowSize; size_t leftLen = inBufSize - rowSize;
(void)memcpy(outBuf, inBuf + rowSize, leftLen); (void)memcpy(pBuf, inBuf + rowSize, leftLen);
outBuf += leftLen; pBuf += leftLen;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -2233,8 +2233,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno); QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno);
// used for backward compatibility of function's result info // used for backward compatibility of function's result info
pSup->pState->pResultRowStore->resultRowGet = getResultRowFromBuf; pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
pSup->pState->pResultRowStore->resultRowPut = putResultRowToBuf; pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
pSup->pState->pExprSupp = pExpSup; pSup->pState->pExprSupp = pExpSup;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
@ -5396,6 +5396,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
&pInfo->pState->pFileState); &pInfo->pState->pFileState);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
// used for backward compatibility of function's result info
pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
pInfo->pState->pExprSupp = &pOperator->exprSupp;
pInfo->pOperator = pOperator; pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);

View File

@ -3291,11 +3291,11 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
char* dst = NULL; char* dst = NULL;
size_t size = 0; size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) { if (code != 0) {
return code; return code;
} }
STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, size); STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, (int32_t)size);
taosMemoryFree(dst); taosMemoryFree(dst);
return code; return code;
@ -3312,7 +3312,7 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void**
return code; return code;
} }
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal); taosMemoryFree(tVal);
return code; return code;
} }
@ -3562,11 +3562,11 @@ int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, c
int code = 0; int code = 0;
char* dst = NULL; char* dst = NULL;
size_t size = 0; size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) { if (code != 0) {
return code; return code;
} }
STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, size); STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, (int32_t)size);
taosMemoryFree(dst); taosMemoryFree(dst);
return code; return code;
@ -3580,7 +3580,7 @@ int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, v
taosMemoryFree(tVal); taosMemoryFree(tVal);
return code; return code;
} }
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal); taosMemoryFree(tVal);
return code; return code;
@ -3600,11 +3600,11 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
} }
char* dst = NULL; char* dst = NULL;
size_t size = 0; size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) { if (code != 0) {
return code; return code;
} }
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, size); STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, (int32_t)size);
taosMemoryFree(dst); taosMemoryFree(dst);
return code; return code;
@ -3904,30 +3904,14 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
int code = 0; int code = 0;
char* dst = NULL; STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen);
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, dst, size);
taosMemoryFree(dst);
return code; return code;
} }
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0; int code = 0;
char* tVal; STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
size_t tValLen = 0;
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, &tVal, &tValLen);
if (code != 0) {
taosMemoryFree(tVal);
return code;
}
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal);
return code; return code;
} }
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
@ -4267,7 +4251,7 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, cons
int code = 0; int code = 0;
char* dst = NULL; char* dst = NULL;
size_t size = 0; size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size);
if (code != 0) { if (code != 0) {
return code; return code;
} }
@ -4285,7 +4269,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void
taosMemoryFree(tVal); taosMemoryFree(tVal);
return code; return code;
} }
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen); code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen);
taosMemoryFree(tVal); taosMemoryFree(tVal);
return code; return code;
@ -4294,14 +4278,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void
// parname cfg // parname cfg
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
int code = 0; int code = 0;
char* dst = NULL; STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN);
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, tbname, TSDB_TABLE_NAME_LEN, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)dst, size);
taosMemoryFree(dst);
return code; return code;
} }
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) { int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {
@ -4313,29 +4290,12 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0; int code = 0;
char* dst = NULL; STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
size_t size = 0;
code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, pVal, pVLen, &dst, &size);
if (code != 0) {
return code;
}
STREAM_STATE_PUT_ROCKSDB(pState, "default", key, dst, size);
taosMemoryFree(dst);
return code; return code;
} }
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0; int code = 0;
char* tVal;
size_t tValLen = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
if (code != 0) {
taosMemoryFree(tVal);
return code;
}
code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
taosMemoryFree(tVal);
return code; return code;
} }
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
@ -4481,7 +4441,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
char* dst = NULL; char* dst = NULL;
size_t size = 0; size_t size = 0;
int32_t code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); int32_t code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
if (code != 0) { if (code != 0) {
return code; return code;
} }

View File

@ -528,6 +528,9 @@ void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
} }
dst->dump = 1; dst->dump = 1;
dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend; dst->pTdbState->pOwner->pBackend = src->pTdbState->pOwner->pBackend;
dst->pResultRowStore.resultRowPut = src->pResultRowStore.resultRowPut;
dst->pResultRowStore.resultRowGet = src->pResultRowStore.resultRowGet;
dst->pExprSupp = src->pExprSupp;
return; return;
} }
SStreamStateCur* createStreamStateCursor() { SStreamStateCur* createStreamStateCursor() {