fix issue
This commit is contained in:
parent
3365da365f
commit
b30097cc88
|
@ -2238,10 +2238,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
||||||
*(pSup->pState) = *pState;
|
*(pSup->pState) = *pState;
|
||||||
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
||||||
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
||||||
// used for backward compatibility of function's result info
|
if (stateType != STREAM_STATE_BUFF_HASH_SORT) {
|
||||||
pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
|
// used for backward compatibility of function's result info
|
||||||
pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
|
pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
|
||||||
pSup->pState->pExprSupp = pExpSup;
|
pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
|
||||||
|
pSup->pState->pExprSupp = pExpSup;
|
||||||
|
}
|
||||||
|
|
||||||
if (stateType == STREAM_STATE_BUFF_SORT) {
|
if (stateType == STREAM_STATE_BUFF_SORT) {
|
||||||
pSup->pState->pFileState = NULL;
|
pSup->pState->pFileState = NULL;
|
||||||
|
|
|
@ -4328,7 +4328,7 @@ int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, cons
|
||||||
int code = 0;
|
int code = 0;
|
||||||
char* dst = NULL;
|
char* dst = NULL;
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
|
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL || tag == NULL) {
|
||||||
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
|
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4607,7 +4607,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
|
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
|
||||||
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
||||||
|
|
||||||
taosMemoryFree(dst);
|
if (pState->pResultRowStore.resultRowPut != NULL && pState->pExprSupp != NULL) {
|
||||||
|
taosMemoryFree(dst);
|
||||||
|
}
|
||||||
|
|
||||||
if (tmpBuf == NULL) {
|
if (tmpBuf == NULL) {
|
||||||
taosMemoryFree(ttlV);
|
taosMemoryFree(ttlV);
|
||||||
|
|
Loading…
Reference in New Issue