diff --git a/include/common/tcommon.h b/include/common/tcommon.h index a04e2afc94..e26262a8eb 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -50,7 +50,8 @@ enum { }; enum { - TMQ_MSG_TYPE__POLL_RSP = 0, + TMQ_MSG_TYPE__DUMMY = 0, + TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__EP_RSP, }; @@ -285,4 +286,4 @@ typedef struct SSessionWindow { } #endif -#endif /*_TD_COMMON_DEF_H_*/ +#endif /*_TD_COMMON_DEF_H_*/ diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index ab955be491..0e84371199 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -28,6 +28,11 @@ #include "tqueue.h" #include "tref.h" +static int64_t perfWrite; +static int64_t perfRead; +static int64_t perfRead2; +static int64_t perfRead3; + struct tmq_list_t { int32_t cnt; int32_t tot; @@ -67,6 +72,7 @@ struct tmq_t { tmq_commit_cb* commit_cb; int32_t nextTopicIdx; int32_t waitingRequest; + int32_t readyRequest; SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of tmq_message_t STaosQall* qall; @@ -118,10 +124,12 @@ typedef struct { } SMqAskEpCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - int32_t epoch; - tsem_t rspSem; + tmq_t* tmq; + SMqClientVg* pVg; + int32_t epoch; + tsem_t rspSem; + tmq_message_t** msg; + int32_t sync; } SMqPollCbParam; typedef struct { @@ -240,6 +248,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->pollCnt = 0; pTmq->epoch = 0; pTmq->waitingRequest = 0; + pTmq->readyRequest = 0; // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -651,6 +660,24 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } else { atomic_sub_fetch_32(&tmq->waitingRequest, 1); } + if (pParam->sync == 1) { + /**pParam->msg = malloc(sizeof(tmq_message_t));*/ + *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t)); + if (*pParam->msg) { + memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp)); + if ((*pParam->msg)->consumeRsp.numOfTopics != 0) { + pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset; + } + int64_t begin = clock(); + taosWriteQitem(tmq->mqueue, *pParam->msg); + perfWrite += clock() - begin; + tsem_post(&pParam->rspSem); + return 0; + } + tsem_post(&pParam->rspSem); + return -1; + } /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); @@ -671,6 +698,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } pRsp->extra = pParam->pVg; taosWriteQitem(tmq->mqueue, pRsp); + atomic_add_fetch_32(&tmq->readyRequest, 1); /*printf("poll in queue\n");*/ /*pParam->rspMsg = (tmq_message_t*)pRsp;*/ @@ -742,7 +770,8 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp); - taosWriteQitem(tmq->mqueue, pRsp); + + /*taosWriteQitem(tmq->mqueue, pRsp);*/ } return 0; } @@ -866,6 +895,73 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } } +tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { + tmq_message_t* msg = NULL; + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); + /*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/ + /*continue;*/ + /*}*/ + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + if (pReq == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return NULL; + } + SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); + if (param == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return NULL; + } + param->tmq = tmq; + param->pVg = pVg; + param->epoch = tmq->epoch; + param->sync = 1; + param->msg = &msg; + tsem_init(¶m->rspSem, 0, 0); + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); + pRequest->body.requestMsg = (SDataBuf){ + .pData = pReq, + .len = sizeof(SMqConsumeReq), + .handle = NULL, + }; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = param; + sendInfo->fp = tmqPollCb; + + int64_t transporterId = 0; + /*printf("send poll\n");*/ + atomic_add_fetch_32(&tmq->waitingRequest, 1); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + pVg->pollCnt++; + tmq->pollCnt++; + + int64_t begin = clock(); + tsem_wait(¶m->rspSem); + perfRead3 += clock() - begin; + tmq_message_t* nmsg = NULL; + while (1) { + int64_t begin1 = clock(); + taosReadQitem(tmq->mqueue, (void**)&nmsg); + perfRead2 += clock() - begin1; + if (nmsg == NULL) continue; + /*while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {*/ + /*taosReadQitem(tmq->mqueue, (void**)&nmsg);*/ + /*}*/ + perfRead += clock() - begin; + return nmsg; + } + } + } + return NULL; +} + int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { /*printf("call poll\n");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { @@ -891,6 +987,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { param->tmq = tmq; param->pVg = pVg; param->epoch = tmq->epoch; + param->sync = 0; SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, @@ -940,6 +1037,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese } if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + atomic_sub_fetch_32(&tmq->readyRequest, 1); /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) { /*printf("epoch match\n");*/ @@ -969,10 +1067,38 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* rspMsg = NULL; int64_t startTime = taosGetTimestampMs(); + int64_t status = atomic_load_64(&tmq->status); + tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); + + while (1) { + rspMsg = tmqSyncPollImpl(tmq, blocking_time); + if (rspMsg && rspMsg->consumeRsp.numOfTopics) { + return rspMsg; + } + + if (blocking_time != 0) { + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + printf("perf write %f\n", (double)perfWrite / CLOCKS_PER_SEC); + printf("perf read %f\n", (double)perfRead / CLOCKS_PER_SEC); + printf("perf read2 %f\n", (double)perfRead2 / CLOCKS_PER_SEC); + printf("perf read3 %f\n", (double)perfRead3 / CLOCKS_PER_SEC); + return NULL; + } + } else + return NULL; + } +} + +tmq_message_t* tmq_consumer_poll_v0(tmq_t* tmq, int64_t blocking_time) { + tmq_message_t* rspMsg = NULL; + int64_t startTime = taosGetTimestampMs(); + // TODO: put into another thread or delayed queue int64_t status = atomic_load_64(&tmq->status); tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); +#if 0 taosGetQitem(tmq->qall, (void**)&rspMsg); if (rspMsg == NULL) { taosReadAllQitems(tmq->mqueue, tmq->qall); @@ -981,23 +1107,27 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (rspMsg) { return rspMsg; } +#endif while (1) { /*printf("cycle\n");*/ if (atomic_load_32(&tmq->waitingRequest) == 0) { tmqPollImpl(tmq, blocking_time); } + while (atomic_load_32(&tmq->readyRequest) == 0) { + sched_yield(); + if (blocking_time != 0) { + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + return NULL; + } + } + } taosReadAllQitems(tmq->mqueue, tmq->qall); rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); if (rspMsg) { return rspMsg; } - if (blocking_time != 0) { - int64_t endTime = taosGetTimestampMs(); - if (endTime - startTime > blocking_time) { - return NULL; - } - } } } @@ -1135,6 +1265,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; tDeleteSMqConsumeRsp(pRsp); + /*free(tmq_message);*/ taosFreeQitem(tmq_message); }