Merge remote-tracking branch 'origin/enh/rocksdbSstate' into enh/rocksdbSstate

This commit is contained in:
Haojun Liao 2023-05-15 13:49:38 +08:00
commit af1ef319a4
3 changed files with 8 additions and 2 deletions

View File

@ -297,7 +297,6 @@ typedef struct SPartitionBySupporter {
typedef struct SPartitionDataInfo { typedef struct SPartitionDataInfo {
uint64_t groupId; uint64_t groupId;
char* tbname; char* tbname;
SArray* tags;
SArray* rowIds; SArray* rowIds;
} SPartitionDataInfo; } SPartitionDataInfo;

View File

@ -1215,6 +1215,11 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
return pBlock; return pBlock;
} }
void freePartItem(void* ptr) {
SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr;
taosArrayDestroy(pPart->rowIds);
}
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1293,6 +1298,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
taosHashSetFreeFp(pInfo->pPartitions, freePartItem);
pInfo->tsColIndex = 0; pInfo->tsColIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);

View File

@ -149,7 +149,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0; int32_t batchCnt = 0;
while (1) { while (1) {
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
taosArrayDestroy(pRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0; return 0;
} }
@ -203,6 +203,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
code = streamTaskOutput(pTask, qRes); code = streamTaskOutput(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosFreeQitem(pRes); taosFreeQitem(pRes);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return code; return code;
} }