Merge pull request #13745 from taosdata/feature/stream
enh(tmq): avoid uninitialized memory
This commit is contained in:
commit
32151de1f0
|
@ -37,14 +37,6 @@ enum {
|
||||||
TMQ_MSG_TYPE__EP_RSP,
|
TMQ_MSG_TYPE__EP_RSP,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
|
||||||
STREAM_TRIGGER__AT_ONCE = 1,
|
|
||||||
STREAM_TRIGGER__WINDOW_CLOSE,
|
|
||||||
STREAM_TRIGGER__BY_COUNT,
|
|
||||||
STREAM_TRIGGER__BY_BATCH_COUNT,
|
|
||||||
STREAM_TRIGGER__BY_EVENT_TIME,
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef enum EStreamType {
|
typedef enum EStreamType {
|
||||||
STREAM_NORMAL = 1,
|
STREAM_NORMAL = 1,
|
||||||
STREAM_INVERT,
|
STREAM_INVERT,
|
||||||
|
|
|
@ -408,7 +408,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_
|
||||||
pParam->userParam = userParam;
|
pParam->userParam = userParam;
|
||||||
if (!async) tsem_init(&pParam->rspSem, 0, 0);
|
if (!async) tsem_init(&pParam->rspSem, 0, 0);
|
||||||
|
|
||||||
sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
|
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) goto END;
|
if (sendInfo == NULL) goto END;
|
||||||
sendInfo->msgInfo = (SDataBuf){
|
sendInfo->msgInfo = (SDataBuf){
|
||||||
.pData = buf,
|
.pData = buf,
|
||||||
|
@ -704,7 +704,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
void* abuf = buf;
|
void* abuf = buf;
|
||||||
tSerializeSCMSubscribeReq(&abuf, &req);
|
tSerializeSCMSubscribeReq(&abuf, &req);
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
|
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) goto FAIL;
|
if (sendInfo == NULL) goto FAIL;
|
||||||
|
|
||||||
SMqSubscribeCbParam param = {
|
SMqSubscribeCbParam param = {
|
||||||
|
@ -1008,7 +1008,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||||
pParam->async = async;
|
pParam->async = async;
|
||||||
tsem_init(&pParam->rspSem, 0, 0);
|
tsem_init(&pParam->rspSem, 0, 0);
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
|
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
tsem_destroy(&pParam->rspSem);
|
tsem_destroy(&pParam->rspSem);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
|
@ -1162,7 +1162,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
pParam->vgId = pVg->vgId;
|
pParam->vgId = pVg->vgId;
|
||||||
pParam->epoch = tmq->epoch;
|
pParam->epoch = tmq->epoch;
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
|
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
|
|
|
@ -351,9 +351,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
ASSERT(totLevel <= 2);
|
ASSERT(totLevel <= 2);
|
||||||
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
|
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
|
||||||
|
|
||||||
bool hasExtraSink = false;
|
bool hasExtraSink = false;
|
||||||
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
|
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
|
||||||
if (totLevel == 2 || externalTargetDB) {
|
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
|
||||||
|
ASSERT(pDbObj != NULL);
|
||||||
|
sdbRelease(pSdb, pDbObj);
|
||||||
|
|
||||||
|
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
|
||||||
|
|
||||||
|
if (totLevel == 2 || externalTargetDB || multiTarget) {
|
||||||
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
|
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
|
||||||
taosArrayPush(pStream->tasks, &taskOneLevel);
|
taosArrayPush(pStream->tasks, &taskOneLevel);
|
||||||
// add extra sink
|
// add extra sink
|
||||||
|
|
|
@ -136,6 +136,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
pReq->subKey);
|
pReq->subKey);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->consumerId != consumerId) {
|
if (pHandle->consumerId != consumerId) {
|
||||||
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
|
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
|
||||||
consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId);
|
consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId);
|
||||||
|
|
|
@ -110,9 +110,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
// continue dispatch
|
// continue dispatch
|
||||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
streamDispatch(pTask, pMsgCb);
|
||||||
streamDispatch(pTask, pMsgCb);
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -182,6 +182,7 @@ FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
|
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
|
||||||
#if 1
|
#if 1
|
||||||
int8_t old =
|
int8_t old =
|
||||||
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
|
Loading…
Reference in New Issue