commit
5445e836de
|
@ -851,6 +851,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
|
||||||
if (hasPrevWindow(pFillSup)) {
|
if (hasPrevWindow(pFillSup)) {
|
||||||
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
|
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
|
||||||
pFillInfo->pos = FILL_POS_END;
|
pFillInfo->pos = FILL_POS_END;
|
||||||
|
resetFillWindow(&pFillSup->next);
|
||||||
pFillSup->next.key = pFillSup->cur.key;
|
pFillSup->next.key = pFillSup->cur.key;
|
||||||
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
|
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
|
||||||
pFillInfo->preRowKey = INT64_MIN;
|
pFillInfo->preRowKey = INT64_MIN;
|
||||||
|
|
|
@ -2905,6 +2905,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
|
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
|
|
||||||
if (pInfo->pChildren != NULL) {
|
if (pInfo->pChildren != NULL) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
@ -4096,6 +4097,7 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
if (pInfo->pChildren != NULL) {
|
if (pInfo->pChildren != NULL) {
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
@ -4109,6 +4111,7 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
taosArrayDestroy(pInfo->historyWins);
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
tSimpleHashCleanup(pInfo->pSeUpdated);
|
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pSeDeleted);
|
tSimpleHashCleanup(pInfo->pSeDeleted);
|
||||||
|
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1613,6 +1613,9 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
|
||||||
const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
|
const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
|
||||||
stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
|
stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
|
||||||
|
|
||||||
|
if (pVal != NULL) *pVal = NULL;
|
||||||
|
if (pVLen != NULL) *pVLen = 0;
|
||||||
|
|
||||||
SStateSessionKey* pKTmp = &ktmp;
|
SStateSessionKey* pKTmp = &ktmp;
|
||||||
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
|
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
|
||||||
char* val = NULL;
|
char* val = NULL;
|
||||||
|
@ -1620,19 +1623,23 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pKTmp->opNum != pCur->number) {
|
||||||
|
taosMemoryFree(val);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
|
||||||
|
taosMemoryFree(val);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pVal != NULL) {
|
if (pVal != NULL) {
|
||||||
*pVal = (char*)val;
|
*pVal = (char*)val;
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(val);
|
taosMemoryFree(val);
|
||||||
}
|
}
|
||||||
if (pVLen != NULL) *pVLen = len;
|
|
||||||
|
|
||||||
if (pKTmp->opNum != pCur->number) {
|
if (pVLen != NULL) *pVLen = len;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
*pKey = pKTmp->key;
|
*pKey = pKTmp->key;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue