fix(tmq): fix the invalid write.
This commit is contained in:
parent
ed21ef0420
commit
066a173d28
|
@ -113,6 +113,7 @@ struct tmq_t {
|
||||||
|
|
||||||
typedef struct SAskEpInfo {
|
typedef struct SAskEpInfo {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
tsem_t sem;
|
||||||
} SAskEpInfo;
|
} SAskEpInfo;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -2138,8 +2139,9 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_wait(&pInfo->sem);
|
tsem_wait(&pInfo->sem);
|
||||||
|
|
||||||
code = pInfo->code;
|
code = pInfo->code;
|
||||||
|
|
||||||
|
tsem_destroy(&pInfo->sem);
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
|
|
||||||
tscDebug("consumer:0x%"PRIx64" sync commit done, code:%s", tmq->consumerId, tstrerror(code));
|
tscDebug("consumer:0x%"PRIx64" sync commit done, code:%s", tmq->consumerId, tstrerror(code));
|
||||||
|
@ -2159,7 +2161,7 @@ void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* par
|
||||||
tDeleteSMqAskEpRsp(&rsp);
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_post(&pTmq->rspSem);
|
tsem_post(&pInfo->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
||||||
|
@ -2186,11 +2188,13 @@ void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* p
|
||||||
|
|
||||||
int32_t doAskEp(tmq_t* pTmq) {
|
int32_t doAskEp(tmq_t* pTmq) {
|
||||||
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
|
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
|
||||||
|
tsem_init(&pInfo->sem, 0, 0);
|
||||||
|
|
||||||
asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
|
asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
|
||||||
tsem_wait(&pTmq->rspSem);
|
tsem_wait(&pInfo->sem);
|
||||||
|
|
||||||
int32_t code = pInfo->code;
|
int32_t code = pInfo->code;
|
||||||
|
tsem_destroy(&pInfo->sem);
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue