fix(stream):set buff size for selectivity function
This commit is contained in:
parent
3b3a0fbe62
commit
8c6dc37533
|
@ -4166,7 +4166,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
memcpy(tmp, *pVal, *pVLen);
|
||||||
taosMemoryFreeClear(*pVal);
|
taosMemoryFreeClear(*pVal);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -4182,7 +4182,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
|
||||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
|
code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
memcpy(tmp, *pVal, *pVLen);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,11 +170,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = getRowStateRowSize(pState->pFileState);
|
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||||
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
|
int32_t tmpLen = len;
|
||||||
|
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &tmpLen);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||||
memcpy(buf + len - rowSize, value, vLen);
|
memcpy(buf + len - rowSize, value, vLen);
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
@ -188,11 +189,12 @@ int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVa
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = getRowStateRowSize(pState->pFileState);
|
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||||
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
|
int32_t tmpLen = len;
|
||||||
|
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &tmpLen);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||||
*ppVal = buf + len - rowSize;
|
*ppVal = buf + len - rowSize;
|
||||||
streamStateReleaseBuf(pState, pVal, false);
|
streamStateReleaseBuf(pState, pVal, false);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue