diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index e52da96f01..5fc89d9d24 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -29,7 +29,7 @@ typedef dispatch_semaphore_t tsem_t; int tsem_init(tsem_t *sem, int pshared, unsigned int value); int tsem_wait(tsem_t *sem); -int tsem_timewait(tsem_t *sim, int64_t nanosecs); +int tsem_timewait(tsem_t *sim, int64_t milis); int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem); @@ -38,7 +38,7 @@ int tsem_destroy(tsem_t *sem); #define tsem_t sem_t #define tsem_init sem_init int tsem_wait(tsem_t *sem); -int tsem_timewait(tsem_t *sim, int64_t nanosecs); +int tsem_timewait(tsem_t *sim, int64_t milis); #define tsem_post sem_post #define tsem_destroy sem_destroy diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 1dd3174c29..ade0c95227 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -25,6 +25,13 @@ #include "tref.h" #include "ttimer.h" +#if 0 +#undef tsem_post +#define tsem_post(x) \ + tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \ + sem_post(x) +#endif + int32_t tmqAskEp(tmq_t* tmq, bool async); typedef struct { @@ -733,12 +740,12 @@ void tmqSendHbReq(void* param, void* tmrId) { req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; - int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); + int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); if (tlen < 0) { tscError("tSerializeSMqHbReq failed"); return; } - void *pReq = taosMemoryCalloc(1, tlen); + void* pReq = taosMemoryCalloc(1, tlen); if (tlen < 0) { tscError("failed to malloc MqHbReq msg, size:%d", tlen); return; @@ -1397,12 +1404,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { req.epoch = tmq->epoch; strcpy(req.cgroup, tmq->groupId); - int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); + int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); if (tlen < 0) { tscError("tSerializeSMqAskEpReq failed"); return -1; } - void *pReq = taosMemoryCalloc(1, tlen); + void* pReq = taosMemoryCalloc(1, tlen); if (tlen < 0) { tscError("failed to malloc askEpReq msg, size:%d", tlen); return -1; @@ -1461,7 +1468,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { return code; } -void tmqBuildConsumeReqImpl(SMqPollReq *pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { +void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { /*strcpy(pReq->topic, pTopic->topicName);*/ /*strcpy(pReq->cgroup, tmq->groupId);*/ @@ -1561,20 +1568,20 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { tsem_post(&tmq->rspSem); return -1; } - char *msg = taosMemoryCalloc(1, msgSize); + char* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); tsem_post(&tmq->rspSem); return -1; } - + if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { taosMemoryFree(msg); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); tsem_post(&tmq->rspSem); return -1; } - + SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); if (pParam == NULL) { taosMemoryFree(msg); @@ -1797,17 +1804,20 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { return NULL; } if (timeout != -1) { - int64_t endTime = taosGetTimestampMs(); - int64_t leftTime = endTime - startTime; - if (leftTime > timeout) { - tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", end time %" PRId64, - tmq->consumerId, tmq->epoch, startTime, endTime); + int64_t currentTime = taosGetTimestampMs(); + int64_t passedTime = currentTime - startTime; + if (passedTime > timeout) { + tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, + tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } - tsem_timewait(&tmq->rspSem, leftTime * 1000); + /*tscInfo("consumer:%" PRId64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/ + /*", left time %" PRId64,*/ + /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/ + tsem_timewait(&tmq->rspSem, (timeout - passedTime)); } else { // use tsem_timewait instead of tsem_wait to avoid unexpected stuck - tsem_timewait(&tmq->rspSem, 500 * 1000); + tsem_timewait(&tmq->rspSem, 1000); } } } diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 310804da8d..53d8dad226 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -75,17 +75,18 @@ int32_t tsem_wait(tsem_t* sem) { return ret; } -int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { - struct timespec ts, rel; - FILETIME ft_before, ft_after; - int rc; +int32_t tsem_timewait(tsem_t* sem, int64_t milis) { + return tsem_wait(sem); +#if 0 + struct timespec ts; + timespec_get(&ts); + ts.tv_nsec += ms * 1000000; + ts.tv_sec += ts.tv_nsec / 1000000000; + ts.tv_nsec %= 1000000000; - rel.tv_sec = 0; - rel.tv_nsec = nanosecs; - - GetSystemTimeAsFileTime(&ft_before); + /*GetSystemTimeAsFileTime(&ft_before);*/ // errno = 0; - rc = sem_timedwait(sem, pthread_win32_getabstime_np(&ts, &rel)); + rc = sem_timedwait(sem, ts); /* This should have timed out */ // assert(errno == ETIMEDOUT); @@ -102,6 +103,7 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // return 1; // } return rc; +#endif } #elif defined(_TD_DARWIN_64) @@ -133,9 +135,9 @@ int tsem_wait(tsem_t *psem) { return 0; } -int tsem_timewait(tsem_t *psem, int64_t nanosecs) { +int tsem_timewait(tsem_t *psem, int64_t milis) { if (psem == NULL || *psem == NULL) return -1; - dispatch_semaphore_wait(*psem, nanosecs); + dispatch_semaphore_wait(*psem, milis * 1000 * 1000); return 0; } @@ -227,15 +229,20 @@ int32_t tsem_wait(tsem_t* sem) { return ret; } -int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { +int32_t tsem_timewait(tsem_t* sem, int64_t ms) { int ret = 0; - struct timespec tv = { - .tv_sec = 0, - .tv_nsec = nanosecs, - }; + struct timespec ts = {0}; - while ((ret = sem_timedwait(sem, &tv)) == -1 && errno == EINTR) continue; + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + return -1; + } + + ts.tv_nsec += ms * 1000000; + ts.tv_sec += ts.tv_nsec / 1000000000; + ts.tv_nsec %= 1000000000; + + while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue; return ret; }