fix tmq crash issue
This commit is contained in:
parent
294b54004f
commit
38e6bf3b6e
|
@ -727,7 +727,9 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
|
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
|
||||||
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
|
if (code) {
|
||||||
|
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
|
@ -1162,7 +1162,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
pParam->vgId = pVg->vgId;
|
pParam->vgId = pVg->vgId;
|
||||||
pParam->epoch = tmq->epoch;
|
pParam->epoch = tmq->epoch;
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
|
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
|
|
|
@ -334,6 +334,7 @@ typedef struct {
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
int32_t tagVer;
|
int32_t tagVer;
|
||||||
int32_t colVer;
|
int32_t colVer;
|
||||||
|
int32_t smaVer;
|
||||||
int32_t nextColId;
|
int32_t nextColId;
|
||||||
float xFilesFactor;
|
float xFilesFactor;
|
||||||
int32_t delay;
|
int32_t delay;
|
||||||
|
|
|
@ -1514,7 +1514,7 @@ int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes) {
|
||||||
if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) {
|
if (EXPLAIN_MODE_STATIC == pReq->pDag->explainInfo.mode) {
|
||||||
SCH_ERR_JRET(schExecStaticExplainJob(pReq, pJob, true));
|
SCH_ERR_JRET(schExecStaticExplainJob(pReq, pJob, true));
|
||||||
} else {
|
} else {
|
||||||
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, pRes, true));
|
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, NULL, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
|
@ -490,6 +490,7 @@ void* consumeThreadFunc(void* param) {
|
||||||
build_consumer(pInfo);
|
build_consumer(pInfo);
|
||||||
build_topic_list(pInfo);
|
build_topic_list(pInfo);
|
||||||
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
|
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
|
||||||
|
assert(0);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue