diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a627d5f190..ade16b7f1a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -113,6 +113,7 @@ struct tmq_t { typedef struct SAskEpInfo { int32_t code; + tsem_t sem; } SAskEpInfo; enum { @@ -2138,8 +2139,9 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { } tsem_wait(&pInfo->sem); - code = pInfo->code; + + tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); tscDebug("consumer:0x%"PRIx64" sync commit done, code:%s", tmq->consumerId, tstrerror(code)); @@ -2159,7 +2161,7 @@ void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* par tDeleteSMqAskEpRsp(&rsp); } - tsem_post(&pTmq->rspSem); + tsem_post(&pInfo->sem); } void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { @@ -2186,11 +2188,13 @@ void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* p int32_t doAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); + tsem_init(&pInfo->sem, 0, 0); asyncAskEp(pTmq, updateEpCallbackFn, pInfo); - tsem_wait(&pTmq->rspSem); + tsem_wait(&pInfo->sem); int32_t code = pInfo->code; + tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); return code; }