fix(stream): memory error
This commit is contained in:
parent
8a9e729db2
commit
fb52cfa816
|
@ -108,6 +108,10 @@ typedef struct {
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
STqExecHandle execHandle;
|
STqExecHandle execHandle;
|
||||||
|
|
||||||
|
// prevent drop
|
||||||
|
int64_t ntbUid;
|
||||||
|
SArray* colIdList; // SArray<int32_t>
|
||||||
} STqHandle;
|
} STqHandle;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
|
|
|
@ -142,6 +142,7 @@ void tqClose(STQ*);
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||||
|
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId);
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
|
|
|
@ -208,6 +208,26 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) {
|
||||||
|
void* pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = taosHashIterate(pTq->handles, pIter);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
STqHandle* pExec = (STqHandle*)pIter;
|
||||||
|
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
int32_t sz = taosArrayGetSize(pExec->colIdList);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
int32_t forbidColId = *(int32_t*)taosArrayGet(pExec->colIdList, i);
|
||||||
|
if (forbidColId == colId) {
|
||||||
|
taosHashCancelIterate(pTq->handles, pIter);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
|
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
|
||||||
pRsp->reqOffset = pReq->reqOffset;
|
pRsp->reqOffset = pReq->reqOffset;
|
||||||
|
|
||||||
|
@ -752,8 +772,9 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
|
SStreamTask* pTask = *ppTask;
|
||||||
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
||||||
atomic_store_8(&(*ppTask)->taskStatus, TASK_STATUS__DROPPING);
|
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
||||||
}
|
}
|
||||||
// todo
|
// todo
|
||||||
// clear queue
|
// clear queue
|
||||||
|
|
|
@ -1519,6 +1519,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
blockDataDestroy(pStreamScan->pPullDataRes);
|
blockDataDestroy(pStreamScan->pPullDataRes);
|
||||||
blockDataDestroy(pStreamScan->pDeleteDataRes);
|
blockDataDestroy(pStreamScan->pDeleteDataRes);
|
||||||
taosArrayDestroy(pStreamScan->pBlockLists);
|
taosArrayDestroy(pStreamScan->pBlockLists);
|
||||||
|
taosArrayDestroy(pStreamScan->tsArray);
|
||||||
taosMemoryFree(pStreamScan);
|
taosMemoryFree(pStreamScan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue