Merge pull request #11908 from taosdata/feature/tq
enh(tmq): use sem_timewait to reduce busy loop
This commit is contained in:
commit
84b19fbea2
|
@ -2383,7 +2383,7 @@ typedef struct {
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
uint64_t reqId;
|
uint64_t reqId;
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
int64_t blockingTime;
|
int64_t waitTime;
|
||||||
int64_t currentOffset;
|
int64_t currentOffset;
|
||||||
} SMqPollReqV2;
|
} SMqPollReqV2;
|
||||||
|
|
||||||
|
@ -2400,53 +2400,6 @@ typedef struct {
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
} SMqSubTopicEp;
|
} SMqSubTopicEp;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMqRspHead head;
|
|
||||||
int64_t reqOffset;
|
|
||||||
int64_t rspOffset;
|
|
||||||
int32_t skipLogNum;
|
|
||||||
int32_t dataLen;
|
|
||||||
SArray* blockPos; // beginning pos for each SRetrieveTableRsp
|
|
||||||
void* blockData; // serialized batched SRetrieveTableRsp
|
|
||||||
} SMqPollRspV2;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqPollRspV2(void** buf, const SMqPollRspV2* pRsp) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pRsp->dataLen);
|
|
||||||
if (pRsp->dataLen != 0) {
|
|
||||||
int32_t sz = taosArrayGetSize(pRsp->blockPos);
|
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
int32_t blockPos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
|
|
||||||
tlen += taosEncodeFixedI32(buf, blockPos);
|
|
||||||
}
|
|
||||||
tlen += taosEncodeBinary(buf, pRsp->blockData, pRsp->dataLen);
|
|
||||||
}
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRsp) {
|
|
||||||
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->dataLen);
|
|
||||||
if (pRsp->dataLen != 0) {
|
|
||||||
int32_t sz;
|
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
|
||||||
pRsp->blockPos = taosArrayInit(sz, sizeof(int32_t));
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
int32_t blockPos;
|
|
||||||
buf = taosDecodeFixedI32(buf, &blockPos);
|
|
||||||
taosArrayPush(pRsp->blockPos, &blockPos);
|
|
||||||
}
|
|
||||||
buf = taosDecodeBinary(buf, &pRsp->blockData, pRsp->dataLen);
|
|
||||||
}
|
|
||||||
return (void*)buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMqRspHead head;
|
SMqRspHead head;
|
||||||
int64_t reqOffset;
|
int64_t reqOffset;
|
||||||
|
|
|
@ -72,8 +72,6 @@ extern "C" {
|
||||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||||
#define WAL_MAGIC 0xFAFBFCFDULL
|
#define WAL_MAGIC 0xFAFBFCFDULL
|
||||||
|
|
||||||
#define WAL_CUR_FAILED 1
|
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_WAL_NOLOG = 0,
|
TAOS_WAL_NOLOG = 0,
|
||||||
|
|
|
@ -68,7 +68,7 @@ struct tmq_conf_t {
|
||||||
char* user;
|
char* user;
|
||||||
char* pass;
|
char* pass;
|
||||||
char* db;
|
char* db;
|
||||||
tmq_commit_cb* commit_cb;
|
tmq_commit_cb* commitCb;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct tmq_t {
|
struct tmq_t {
|
||||||
|
@ -115,8 +115,8 @@ enum {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TMQ_CONSUMER_STATUS__INIT = 0,
|
TMQ_CONSUMER_STATUS__INIT = 0,
|
||||||
TMQ_CONSUMER_STATUS__SUBSCRIBED,
|
|
||||||
TMQ_CONSUMER_STATUS__READY,
|
TMQ_CONSUMER_STATUS__READY,
|
||||||
|
TMQ_CONSUMER_STATUS__NO_TOPIC,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -300,6 +300,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) {
|
||||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
||||||
*pTaskType = TMQ_DELAYED_TASK__HB;
|
*pTaskType = TMQ_DELAYED_TASK__HB;
|
||||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||||
|
tsem_post(&tmq->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
||||||
|
@ -307,6 +308,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
||||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
||||||
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
|
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
|
||||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||||
|
tsem_post(&tmq->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
|
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
|
||||||
|
@ -314,6 +316,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
|
||||||
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
|
||||||
*pTaskType = TMQ_DELAYED_TASK__REPORT;
|
*pTaskType = TMQ_DELAYED_TASK__REPORT;
|
||||||
taosWriteQitem(tmq->delayedTask, pTaskType);
|
taosWriteQitem(tmq->delayedTask, pTaskType);
|
||||||
|
tsem_post(&tmq->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
|
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
|
||||||
|
@ -364,7 +367,6 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
||||||
pParam->rspErr = code;
|
pParam->rspErr = code;
|
||||||
tmq_t* tmq = pParam->tmq;
|
tmq_t* tmq = pParam->tmq;
|
||||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__SUBSCRIBED);
|
|
||||||
tsem_post(&pParam->rspSem);
|
tsem_post(&pParam->rspSem);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -475,7 +477,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
strcpy(pTmq->groupId, conf->groupId);
|
strcpy(pTmq->groupId, conf->groupId);
|
||||||
pTmq->autoCommit = conf->autoCommit;
|
pTmq->autoCommit = conf->autoCommit;
|
||||||
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
||||||
pTmq->commit_cb = conf->commit_cb;
|
pTmq->commit_cb = conf->commitCb;
|
||||||
pTmq->resetOffsetCfg = conf->resetOffset;
|
pTmq->resetOffsetCfg = conf->resetOffset;
|
||||||
|
|
||||||
// assign consumerId
|
// assign consumerId
|
||||||
|
@ -686,7 +688,7 @@ FAIL:
|
||||||
|
|
||||||
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
|
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
|
||||||
//
|
//
|
||||||
conf->commit_cb = cb;
|
conf->commitCb = cb;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
|
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
|
||||||
|
@ -798,7 +800,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
// do not write into queue since updating epoch reset
|
// do not write into queue since updating epoch reset
|
||||||
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
|
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
|
||||||
tmqEpoch);
|
tmqEpoch);
|
||||||
/*tsem_post(&tmq->rspSem);*/
|
tsem_post(&tmq->rspSem);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -843,14 +845,14 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
|
pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
|
||||||
|
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
/*tsem_post(&tmq->rspSem);*/
|
tsem_post(&tmq->rspSem);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
CREATE_MSG_FAIL:
|
CREATE_MSG_FAIL:
|
||||||
if (pParam->epoch == tmq->epoch) {
|
if (pParam->epoch == tmq->epoch) {
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
}
|
}
|
||||||
/*tsem_post(&tmq->rspSem);*/
|
tsem_post(&tmq->rspSem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -927,6 +929,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
|
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
|
||||||
taosHashCleanup(pHash);
|
taosHashCleanup(pHash);
|
||||||
tmq->clientTopics = newTopics;
|
tmq->clientTopics = newTopics;
|
||||||
|
|
||||||
|
if (taosArrayGetSize(tmq->clientTopics) == 0)
|
||||||
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
|
||||||
|
else
|
||||||
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
||||||
|
|
||||||
atomic_store_32(&tmq->epoch, epoch);
|
atomic_store_32(&tmq->epoch, epoch);
|
||||||
return set;
|
return set;
|
||||||
}
|
}
|
||||||
|
@ -955,9 +963,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||||
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
|
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
|
||||||
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
|
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
|
||||||
if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
|
tmqUpdateEp(tmq, head->epoch, &rsp);
|
||||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
|
||||||
}
|
|
||||||
tDeleteSMqAskEpRsp(&rsp);
|
tDeleteSMqAskEpRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
|
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
|
||||||
|
@ -972,7 +978,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
||||||
|
|
||||||
taosWriteQitem(tmq->mqueue, pWrapper);
|
taosWriteQitem(tmq->mqueue, pWrapper);
|
||||||
/*tsem_post(&tmq->rspSem);*/
|
tsem_post(&tmq->rspSem);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1076,7 +1082,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
|
||||||
return TMQ_RESP_ERR__FAIL;
|
return TMQ_RESP_ERR__FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||||
int64_t reqOffset;
|
int64_t reqOffset;
|
||||||
if (pVg->currentOffset >= 0) {
|
if (pVg->currentOffset >= 0) {
|
||||||
reqOffset = pVg->currentOffset;
|
reqOffset = pVg->currentOffset;
|
||||||
|
@ -1101,7 +1107,7 @@ SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClient
|
||||||
pReq->subKey[tlen] = TMQ_SEPARATOR;
|
pReq->subKey[tlen] = TMQ_SEPARATOR;
|
||||||
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
|
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
|
||||||
|
|
||||||
pReq->blockingTime = blockingTime;
|
pReq->waitTime = waitTime;
|
||||||
pReq->consumerId = tmq->consumerId;
|
pReq->consumerId = tmq->consumerId;
|
||||||
pReq->epoch = tmq->epoch;
|
pReq->epoch = tmq->epoch;
|
||||||
pReq->currentOffset = reqOffset;
|
pReq->currentOffset = reqOffset;
|
||||||
|
@ -1130,7 +1136,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
|
||||||
/*printf("call poll\n");*/
|
/*printf("call poll\n");*/
|
||||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
|
@ -1151,17 +1157,17 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
atomic_store_32(&pVg->vgSkipCnt, 0);
|
atomic_store_32(&pVg->vgSkipCnt, 0);
|
||||||
SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
|
SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg);
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
/*tsem_post(&tmq->rspSem);*/
|
tsem_post(&tmq->rspSem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
/*tsem_post(&tmq->rspSem);*/
|
tsem_post(&tmq->rspSem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pParam->tmq = tmq;
|
pParam->tmq = tmq;
|
||||||
|
@ -1176,7 +1182,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
/*tsem_post(&tmq->rspSem);*/
|
tsem_post(&tmq->rspSem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1222,7 +1228,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
|
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
|
||||||
while (1) {
|
while (1) {
|
||||||
SMqRspWrapper* rspWrapper = NULL;
|
SMqRspWrapper* rspWrapper = NULL;
|
||||||
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
||||||
|
@ -1261,37 +1267,41 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
|
||||||
taosFreeQitem(rspWrapper);
|
taosFreeQitem(rspWrapper);
|
||||||
if (pollIfReset && reset) {
|
if (pollIfReset && reset) {
|
||||||
tscDebug("consumer %ld reset and repoll", tmq->consumerId);
|
tscDebug("consumer %ld reset and repoll", tmq->consumerId);
|
||||||
tmqPollImpl(tmq, blockingTime);
|
tmqPollImpl(tmq, waitTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
|
||||||
SMqRspObj* rspObj;
|
SMqRspObj* rspObj;
|
||||||
int64_t startTime = taosGetTimestampMs();
|
int64_t startTime = taosGetTimestampMs();
|
||||||
|
|
||||||
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
|
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
|
||||||
if (rspObj) {
|
if (rspObj) {
|
||||||
return (TAOS_RES*)rspObj;
|
return (TAOS_RES*)rspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__READY) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
tmqHandleAllDelayedTask(tmq);
|
tmqHandleAllDelayedTask(tmq);
|
||||||
tmqPollImpl(tmq, blocking_time);
|
tmqPollImpl(tmq, wait_time);
|
||||||
|
|
||||||
/*tsem_wait(&tmq->rspSem);*/
|
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
|
||||||
|
|
||||||
rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
|
|
||||||
if (rspObj) {
|
if (rspObj) {
|
||||||
return (TAOS_RES*)rspObj;
|
return (TAOS_RES*)rspObj;
|
||||||
}
|
}
|
||||||
if (blocking_time != 0) {
|
if (wait_time != 0) {
|
||||||
int64_t endTime = taosGetTimestampMs();
|
int64_t endTime = taosGetTimestampMs();
|
||||||
if (endTime - startTime > blocking_time) {
|
int64_t leftTime = endTime - startTime;
|
||||||
|
if (leftTime > wait_time) {
|
||||||
tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
|
tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
tsem_timewait(&tmq->rspSem, leftTime * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
|
|
||||||
SMqDataBlkRsp rsp = {0};
|
SMqDataBlkRsp rsp = {0};
|
||||||
rsp.reqOffset = pExec->pushHandle.reqOffset;
|
rsp.reqOffset = pExec->pushHandle.reqOffset;
|
||||||
rsp.blockData = taosArrayInit(0, sizeof(int32_t));
|
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
||||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
@ -210,35 +210,6 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
|
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
|
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
|
||||||
|
|
||||||
#if 0
|
|
||||||
void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
|
|
||||||
while (pIter != NULL) {
|
|
||||||
STqPusher* pusher = *(STqPusher**)pIter;
|
|
||||||
if (pusher->type == TQ_PUSHER_TYPE__STREAM) {
|
|
||||||
STqStreamPusher* streamPusher = (STqStreamPusher*)pusher;
|
|
||||||
// repack
|
|
||||||
STqStreamToken* token = taosMemoryMalloc(sizeof(STqStreamToken));
|
|
||||||
if (token == NULL) {
|
|
||||||
taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
token->type = TQ_STREAM_TOKEN__DATA;
|
|
||||||
token->data = msg;
|
|
||||||
// set input
|
|
||||||
// exec
|
|
||||||
}
|
|
||||||
// send msg to ep
|
|
||||||
}
|
|
||||||
// iterate hash
|
|
||||||
// process all msg
|
|
||||||
// if waiting
|
|
||||||
// memcpy and send msg to fetch thread
|
|
||||||
// TODO: add reference
|
|
||||||
// if handle waiting, launch query and response to consumer
|
|
||||||
//
|
|
||||||
// if no waiting handle, return
|
|
||||||
#endif
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,7 +348,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
SMqPollReqV2* pReq = pMsg->pCont;
|
SMqPollReqV2* pReq = pMsg->pCont;
|
||||||
int64_t consumerId = pReq->consumerId;
|
int64_t consumerId = pReq->consumerId;
|
||||||
int64_t waitTime = pReq->blockingTime;
|
int64_t waitTime = pReq->waitTime;
|
||||||
int32_t reqEpoch = pReq->epoch;
|
int32_t reqEpoch = pReq->epoch;
|
||||||
int64_t fetchOffset;
|
int64_t fetchOffset;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue