diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 93117c934f..fcd88ed8d7 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1601,6 +1601,11 @@ void tmqFreeImpl(void* handle) { static void tmqMgmtInit(void) { tmqInitRes = 0; + + if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){ + goto END; + } + tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ"); if (tmqMgmt.timer == NULL) { @@ -1612,10 +1617,6 @@ static void tmqMgmtInit(void) { goto END; } - if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){ - goto END; - } - return; END: tmqInitRes = terrno; @@ -1627,8 +1628,8 @@ void tmqMgmtClose(void) { tmqMgmt.timer = NULL; } - (void) taosThreadMutexLock(&tmqMgmt.lock); - if (tmqMgmt.rsetId >= 0) { + if (tmqMgmt.rsetId > 0) { + (void) taosThreadMutexLock(&tmqMgmt.lock); tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0); int64_t refId = 0; @@ -1647,8 +1648,8 @@ void tmqMgmtClose(void) { } taosCloseRef(tmqMgmt.rsetId); tmqMgmt.rsetId = -1; + (void)taosThreadMutexUnlock(&tmqMgmt.lock); } - (void)taosThreadMutexUnlock(&tmqMgmt.lock); } tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {