fix(tmq): time wait
This commit is contained in:
parent
130d6e3b6c
commit
ec7df42347
|
@ -29,7 +29,7 @@ typedef dispatch_semaphore_t tsem_t;
|
||||||
|
|
||||||
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
|
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
|
||||||
int tsem_wait(tsem_t *sem);
|
int tsem_wait(tsem_t *sem);
|
||||||
int tsem_timewait(tsem_t *sim, int64_t nanosecs);
|
int tsem_timewait(tsem_t *sim, int64_t milis);
|
||||||
int tsem_post(tsem_t *sem);
|
int tsem_post(tsem_t *sem);
|
||||||
int tsem_destroy(tsem_t *sem);
|
int tsem_destroy(tsem_t *sem);
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ int tsem_destroy(tsem_t *sem);
|
||||||
#define tsem_t sem_t
|
#define tsem_t sem_t
|
||||||
#define tsem_init sem_init
|
#define tsem_init sem_init
|
||||||
int tsem_wait(tsem_t *sem);
|
int tsem_wait(tsem_t *sem);
|
||||||
int tsem_timewait(tsem_t *sim, int64_t nanosecs);
|
int tsem_timewait(tsem_t *sim, int64_t milis);
|
||||||
#define tsem_post sem_post
|
#define tsem_post sem_post
|
||||||
#define tsem_destroy sem_destroy
|
#define tsem_destroy sem_destroy
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,13 @@
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
#undef tsem_post
|
||||||
|
#define tsem_post(x) \
|
||||||
|
tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \
|
||||||
|
sem_post(x)
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t tmqAskEp(tmq_t* tmq, bool async);
|
int32_t tmqAskEp(tmq_t* tmq, bool async);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -733,12 +740,12 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
req.consumerId = tmq->consumerId;
|
req.consumerId = tmq->consumerId;
|
||||||
req.epoch = tmq->epoch;
|
req.epoch = tmq->epoch;
|
||||||
|
|
||||||
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
|
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
|
||||||
if (tlen < 0) {
|
if (tlen < 0) {
|
||||||
tscError("tSerializeSMqHbReq failed");
|
tscError("tSerializeSMqHbReq failed");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
void *pReq = taosMemoryCalloc(1, tlen);
|
void* pReq = taosMemoryCalloc(1, tlen);
|
||||||
if (tlen < 0) {
|
if (tlen < 0) {
|
||||||
tscError("failed to malloc MqHbReq msg, size:%d", tlen);
|
tscError("failed to malloc MqHbReq msg, size:%d", tlen);
|
||||||
return;
|
return;
|
||||||
|
@ -1397,12 +1404,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||||
req.epoch = tmq->epoch;
|
req.epoch = tmq->epoch;
|
||||||
strcpy(req.cgroup, tmq->groupId);
|
strcpy(req.cgroup, tmq->groupId);
|
||||||
|
|
||||||
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
|
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
|
||||||
if (tlen < 0) {
|
if (tlen < 0) {
|
||||||
tscError("tSerializeSMqAskEpReq failed");
|
tscError("tSerializeSMqAskEpReq failed");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
void *pReq = taosMemoryCalloc(1, tlen);
|
void* pReq = taosMemoryCalloc(1, tlen);
|
||||||
if (tlen < 0) {
|
if (tlen < 0) {
|
||||||
tscError("failed to malloc askEpReq msg, size:%d", tlen);
|
tscError("failed to malloc askEpReq msg, size:%d", tlen);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1461,7 +1468,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmqBuildConsumeReqImpl(SMqPollReq *pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||||
/*strcpy(pReq->topic, pTopic->topicName);*/
|
/*strcpy(pReq->topic, pTopic->topicName);*/
|
||||||
/*strcpy(pReq->cgroup, tmq->groupId);*/
|
/*strcpy(pReq->cgroup, tmq->groupId);*/
|
||||||
|
|
||||||
|
@ -1561,20 +1568,20 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
char *msg = taosMemoryCalloc(1, msgSize);
|
char* msg = taosMemoryCalloc(1, msgSize);
|
||||||
if (NULL == msg) {
|
if (NULL == msg) {
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
|
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
|
@ -1797,17 +1804,20 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (timeout != -1) {
|
if (timeout != -1) {
|
||||||
int64_t endTime = taosGetTimestampMs();
|
int64_t currentTime = taosGetTimestampMs();
|
||||||
int64_t leftTime = endTime - startTime;
|
int64_t passedTime = currentTime - startTime;
|
||||||
if (leftTime > timeout) {
|
if (passedTime > timeout) {
|
||||||
tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", end time %" PRId64,
|
tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
||||||
tmq->consumerId, tmq->epoch, startTime, endTime);
|
tmq->consumerId, tmq->epoch, startTime, currentTime);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
tsem_timewait(&tmq->rspSem, leftTime * 1000);
|
/*tscInfo("consumer:%" PRId64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
|
||||||
|
/*", left time %" PRId64,*/
|
||||||
|
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
|
||||||
|
tsem_timewait(&tmq->rspSem, (timeout - passedTime));
|
||||||
} else {
|
} else {
|
||||||
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
|
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
|
||||||
tsem_timewait(&tmq->rspSem, 500 * 1000);
|
tsem_timewait(&tmq->rspSem, 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,17 +75,18 @@ int32_t tsem_wait(tsem_t* sem) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
|
int32_t tsem_timewait(tsem_t* sem, int64_t milis) {
|
||||||
struct timespec ts, rel;
|
return tsem_wait(sem);
|
||||||
FILETIME ft_before, ft_after;
|
#if 0
|
||||||
int rc;
|
struct timespec ts;
|
||||||
|
timespec_get(&ts);
|
||||||
|
ts.tv_nsec += ms * 1000000;
|
||||||
|
ts.tv_sec += ts.tv_nsec / 1000000000;
|
||||||
|
ts.tv_nsec %= 1000000000;
|
||||||
|
|
||||||
rel.tv_sec = 0;
|
/*GetSystemTimeAsFileTime(&ft_before);*/
|
||||||
rel.tv_nsec = nanosecs;
|
|
||||||
|
|
||||||
GetSystemTimeAsFileTime(&ft_before);
|
|
||||||
// errno = 0;
|
// errno = 0;
|
||||||
rc = sem_timedwait(sem, pthread_win32_getabstime_np(&ts, &rel));
|
rc = sem_timedwait(sem, ts);
|
||||||
|
|
||||||
/* This should have timed out */
|
/* This should have timed out */
|
||||||
// assert(errno == ETIMEDOUT);
|
// assert(errno == ETIMEDOUT);
|
||||||
|
@ -102,6 +103,7 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
|
||||||
// return 1;
|
// return 1;
|
||||||
// }
|
// }
|
||||||
return rc;
|
return rc;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
#elif defined(_TD_DARWIN_64)
|
#elif defined(_TD_DARWIN_64)
|
||||||
|
@ -133,9 +135,9 @@ int tsem_wait(tsem_t *psem) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsem_timewait(tsem_t *psem, int64_t nanosecs) {
|
int tsem_timewait(tsem_t *psem, int64_t milis) {
|
||||||
if (psem == NULL || *psem == NULL) return -1;
|
if (psem == NULL || *psem == NULL) return -1;
|
||||||
dispatch_semaphore_wait(*psem, nanosecs);
|
dispatch_semaphore_wait(*psem, milis * 1000 * 1000);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,15 +229,20 @@ int32_t tsem_wait(tsem_t* sem) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
|
int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
||||||
struct timespec tv = {
|
struct timespec ts = {0};
|
||||||
.tv_sec = 0,
|
|
||||||
.tv_nsec = nanosecs,
|
|
||||||
};
|
|
||||||
|
|
||||||
while ((ret = sem_timedwait(sem, &tv)) == -1 && errno == EINTR) continue;
|
if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ts.tv_nsec += ms * 1000000;
|
||||||
|
ts.tv_sec += ts.tv_nsec / 1000000000;
|
||||||
|
ts.tv_nsec %= 1000000000;
|
||||||
|
|
||||||
|
while ((ret = sem_timedwait(sem, &ts)) == -1 && errno == EINTR) continue;
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue