From 21e5ddbb8d708f2dc955586a1165ef4d78721a26 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 28 Feb 2022 20:46:23 +0800 Subject: [PATCH 01/11] refactor tmq msg handle --- include/client/taos.h | 6 +- include/common/tmsg.h | 20 +- source/client/src/tmq.c | 375 ++++++++++++++++------ source/dnode/mgmt/impl/src/dndTransport.c | 1 - source/dnode/mnode/impl/src/mndTopic.c | 85 +---- source/dnode/vnode/inc/tq.h | 2 +- source/dnode/vnode/inc/vnode.h | 1 - source/dnode/vnode/src/inc/tqInt.h | 17 +- source/dnode/vnode/src/tq/tq.c | 189 ++--------- source/dnode/vnode/src/vnd/vnodeWrite.c | 2 +- 10 files changed, 325 insertions(+), 373 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 2c8135c8ff..8b1517c6ff 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -224,10 +224,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); -#if 0 -DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); -DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics); -#endif +DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); +DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); #if 0 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ae3586e735..5d989421f6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1380,8 +1380,6 @@ typedef struct SMqCMGetSubEpReq { char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqCMGetSubEpReq; -#pragma pack(pop) - static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pMsg->contLen); @@ -1851,6 +1849,12 @@ typedef struct { SMqTbData* tbData; } SMqTopicData; +typedef struct { + int8_t mqMsgType; + int32_t code; + int32_t epoch; +} SMqRspHead; + typedef struct { int64_t consumerId; SSchemaWrapper* schemas; @@ -1867,6 +1871,7 @@ typedef struct { int64_t consumerId; int64_t blockingTime; + int32_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; int64_t currentOffset; @@ -1886,11 +1891,18 @@ typedef struct { typedef struct { int64_t consumerId; - int32_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray* topics; // SArray } SMqCMGetSubEpRsp; +struct tmq_message_t { + SMqRspHead head; + union { + SMqConsumeRsp consumeRsp; + SMqCMGetSubEpRsp getEpRsp; + }; +}; + static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { @@ -1972,6 +1984,8 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p return buf; } +#pragma pack(pop) + #ifdef __cplusplus } #endif diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 9a1025c4bd..5b4afda923 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -26,6 +26,7 @@ #include "tmsgtype.h" #include "tnote.h" #include "tpagedbuf.h" +#include "tqueue.h" #include "tref.h" struct tmq_list_t { @@ -59,22 +60,34 @@ struct tmq_t { char groupId[256]; char clientId[256]; int8_t autoCommit; - SRWLatch lock; int64_t consumerId; int32_t epoch; int32_t resetOffsetCfg; int64_t status; - tsem_t rspSem; STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; SArray* clientTopics; // SArray + STaosQueue* mqueue; // queue of tmq_message_t + STaosQall* qall; + SRWLatch pollLock; // stat int64_t pollCnt; }; -struct tmq_message_t { - SMqConsumeRsp rsp; +enum { + TMQ_MSG_TYPE__POLL_RSP = 0, + TMQ_MSG_TYPE__EP_RSP, +}; + +enum { + TMQ_VG_STATUS__IDLE = 0, + TMQ_VG_STATUS__WAIT, +}; + +enum { + TMQ_CONSUMER_STATUS__INIT = 0, + TMQ_CONSUMER_STATUS__READY, }; typedef struct { @@ -84,6 +97,7 @@ typedef struct { int64_t currentOffset; // connection info int32_t vgId; + int32_t vgStatus; SEpSet epSet; } SMqClientVg; @@ -105,15 +119,16 @@ typedef struct { typedef struct { tmq_t* tmq; - int32_t wait; + int32_t sync; + tsem_t rspSem; } SMqAskEpCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - tmq_message_t** retMsg; - tsem_t rspSem; -} SMqConsumeCbParam; + tmq_t* tmq; + SMqClientVg* pVg; + tmq_message_t* rspMsg; + tsem_t rspSem; +} SMqPollCbParam; typedef struct { tmq_t* tmq; @@ -210,6 +225,22 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } +tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { + if (*topics == NULL) { + *topics = tmq_list_new(); + } + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i); + tmq_list_append(*topics, strdup(topic->topicName)); + } + return TMQ_RESP_ERR__SUCCESS; +} + +tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) { + tmq_list_t* lst = tmq_list_new(); + return tmq_subscribe(tmq, lst); +} + tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); if (pTmq == NULL) { @@ -219,7 +250,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - taosInitRWLatch(&pTmq->lock); + taosInitRWLatch(&pTmq->pollLock); // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -227,9 +258,11 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; - tsem_init(&pTmq->rspSem, 0, 0); pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); + + pTmq->mqueue = taosOpenQueue(); + pTmq->qall = taosAllocateQall(); return pTmq; } @@ -291,7 +324,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in pParam->tmq = tmq; tsem_init(&pParam->rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->param = pParam; @@ -366,10 +403,17 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tscError("failed to malloc request"); } - SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; + SMqSubscribeCbParam param = { + .rspErr = TMQ_RESP_ERR__SUCCESS, + .tmq = tmq, + }; tsem_init(¶m.rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->param = ¶m; @@ -392,36 +436,6 @@ _return: void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } -SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { - tmq_t* pTmq = (void*)param; - SArray* pArray = taosArrayInit(0, sizeof(SKv)); - if (pArray == NULL) { - return NULL; - } - SKv kv = {0}; - kv.key = HEARTBEAT_KEY_MQ_TMP; - - SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg)); - if (pMqHb == NULL) { - return pArray; - } - pMqHb->consumerId = connKey.connId; - SArray* clientTopics = pTmq->clientTopics; - int sz = taosArrayGetSize(clientTopics); - for (int i = 0; i < sz; i++) { - SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i); - /*if (pCTopic->vgId == -1) {*/ - /*pMqHb->status = 1;*/ - /*break;*/ - /*}*/ - } - kv.value = pMqHb; - kv.valueLen = sizeof(SMqHbMsg); - taosArrayPush(pArray, &kv); - - return pArray; -} - TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { STscObj* pTscObj = (STscObj*)taos; SRequestObj* pRequest = NULL; @@ -579,7 +593,7 @@ void tmqShowMsg(tmq_message_t* tmq_message) { static bool noPrintSchema; char pBuf[128]; - SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; int32_t colNum = pRsp->schemas->nCols; if (!noPrintSchema) { printf("|"); @@ -619,17 +633,16 @@ void tmqShowMsg(tmq_message_t* tmq_message) { } int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { - SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; - SMqClientVg* pVg = pParam->pVg; + SMqPollCbParam* pParam = (SMqPollCbParam*)param; + SMqClientVg* pVg = pParam->pVg; if (code != 0) { printf("msg discard\n"); - tsem_post(&pParam->rspSem); return 0; } SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp)); if (pRsp == NULL) { - tsem_post(&pParam->rspSem); + taosWUnLockLatch(&pParam->tmq->pollLock); return -1; } tDecodeSMqConsumeRsp(pMsg->pData, pRsp); @@ -637,76 +650,80 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (pRsp->numOfTopics == 0) { /*printf("no data\n");*/ free(pRsp); - tsem_post(&pParam->rspSem); + taosWUnLockLatch(&pParam->tmq->pollLock); return 0; } - *pParam->retMsg = (tmq_message_t*)pRsp; + pParam->rspMsg = (tmq_message_t*)pRsp; pVg->currentOffset = pRsp->rspOffset; /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ /*printf("-----msg begin----\n");*/ - tsem_post(&pParam->rspSem); + taosWUnLockLatch(&pParam->tmq->pollLock); /*printf("\n-----msg end------\n");*/ return 0; } +bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { + bool set = false; + int32_t sz = taosArrayGetSize(pRsp->topics); + if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); + tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); + for (int32_t i = 0; i < sz; i++) { + SMqClientTopic topic = {0}; + SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); + topic.topicName = strdup(pTopicEp->topic); + int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); + topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgSz; j++) { + SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + SMqClientVg clientVg = { + .pollCnt = 0, + .currentOffset = pVgEp->offset, + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet, + .vgStatus = TMQ_VG_STATUS__IDLE, + }; + taosArrayPush(topic.vgs, &clientVg); + set = true; + } + taosArrayPush(tmq->clientTopics, &topic); + } + atomic_store_32(&tmq->epoch, epoch); + return set; +} + int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; if (code != 0) { - printf("get topic endpoint error, not ready, wait:%d\n", pParam->wait); - if (pParam->wait) { - tsem_post(&tmq->rspSem); + printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync); + if (pParam->sync) { + tsem_post(&pParam->rspSem); } return 0; } tscDebug("tmq ask ep cb called"); - bool set = false; - SMqCMGetSubEpRsp rsp; - tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); - int32_t sz = taosArrayGetSize(rsp.topics); - // TODO: lock - /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ - /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ - if (rsp.epoch != tmq->epoch) { - // TODO - if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); - tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); - for (int32_t i = 0; i < sz; i++) { - SMqClientTopic topic = {0}; - SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i); - topic.topicName = strdup(pTopicEp->topic); - int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); - topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); - for (int32_t j = 0; j < vgSz; j++) { - SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); - // clang-format off - SMqClientVg clientVg = { - .pollCnt = 0, - .currentOffset = pVgEp->offset, - .vgId = pVgEp->vgId, - .epSet = pVgEp->epSet - }; - // clang-format on - taosArrayPush(topic.vgs, &clientVg); - set = true; - } - taosArrayPush(tmq->clientTopics, &topic); + if (pParam->sync) { + SMqRspHead* head = pMsg->pData; + SMqCMGetSubEpRsp rsp; + tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); + /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ + /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ + int32_t epoch = atomic_load_32(&tmq->epoch); + if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) { + atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY); } - tmq->epoch = rsp.epoch; + tsem_post(&pParam->rspSem); + tDeleteSMqCMGetSubEpRsp(&rsp); + } else { + tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); + memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqCMGetSubEpRsp(pMsg->pData, &pRsp->getEpRsp); + taosWriteQitem(tmq->mqueue, pRsp); } - if (set) { - atomic_store_64(&tmq->status, 1); - } - // unlock - /*tsem_post(&tmq->rspSem);*/ - if (pParam->wait) { - tsem_post(&tmq->rspSem); - } - tDeleteSMqCMGetSubEpRsp(&rsp); return 0; } -int32_t tmqAskEp(tmq_t* tmq, bool wait) { +int32_t tmqAskEp(tmq_t* tmq, bool sync) { int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { @@ -723,7 +740,11 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { goto END; } - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { @@ -731,7 +752,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { goto END; } pParam->tmq = tmq; - pParam->wait = wait; + pParam->sync = sync; + tsem_init(&pParam->rspSem, 0, 0); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; @@ -744,7 +766,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); END: - if (wait) tsem_wait(&tmq->rspSem); + if (sync) tsem_wait(&pParam->rspSem); return 0; } @@ -792,6 +814,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie pReq->blockingTime = blocking_time; pReq->consumerId = tmq->consumerId; + pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; pReq->head.vgId = htonl(pVg->vgId); @@ -799,11 +822,146 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie return pReq; } -tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { - tmq_message_t* tmq_message = NULL; +void tmqClearUnhandleMsg(tmq_t* tmq) { + tmq_message_t* msg; + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } + taosReadAllQitems(tmq->mqueue, tmq->qall); + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } +} + +int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); + if (vgStatus != TMQ_VG_STATUS__IDLE) { + continue; + } + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + if (pReq == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return -1; + } + SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); + if (param == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return -1; + } + param->tmq = tmq; + param->pVg = pVg; + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); + pRequest->body.requestMsg = (SDataBuf){ + .pData = pReq, + .len = sizeof(SMqConsumeReq), + .handle = NULL, + }; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = param; + sendInfo->fp = tmqPollCb; + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + pVg->pollCnt++; + tmq->pollCnt++; + } + } + return 0; +} + +void tmqFetchLeftRes(tmq_t* tmq, tmq_message_t** pRspMsg) { + taosGetQitem(tmq->qall, (void**)pRspMsg); + if (pRspMsg == NULL) { + taosReadAllQitems(tmq->mqueue, tmq->qall); + taosGetQitem(tmq->qall, (void**)pRspMsg); + } +} + +// return +int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { + if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { + tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); + tmqClearUnhandleMsg(tmq); + *pReset = true; + } else { + *pReset = false; + } + } else { + *pReset = false; + return -1; + } + return 0; +} + +tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { + tmq_message_t* rspMsg = NULL; + int64_t startTime = taosGetTimestampMs(); + + // TODO: put into another thread or delayed queue int64_t status = atomic_load_64(&tmq->status); - tmqAskEp(tmq, status == 0); + tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); + + tmqFetchLeftRes(tmq, &rspMsg); + + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) { + taosReadAllQitems(tmq->mqueue, tmq->qall); + } + + while (1) { + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) break; + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + return rspMsg; + } + bool reset = false; + tmqHandleRes(tmq, rspMsg, &reset); + taosFreeQitem(rspMsg); + } + + tmqPollImpl(tmq, blocking_time); + + while (1) { + taosReadAllQitems(tmq->mqueue, tmq->qall); + while (1) { + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) break; + + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + return rspMsg; + } else { + bool reset = false; + tmqHandleRes(tmq, rspMsg, &reset); + taosFreeQitem(rspMsg); + if (reset) tmqPollImpl(tmq, blocking_time); + } + } + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + return NULL; + } + } +} + +#if 0 if (blocking_time <= 0) blocking_time = 1; if (blocking_time > 1000) blocking_time = 1000; @@ -835,7 +993,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { return NULL; } - SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam)); + SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); if (param == NULL) { ASSERT(false); usleep(blocking_time * 1000); @@ -847,7 +1005,11 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tsem_init(¶m->rspSem, 0, 0); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = pReq, + .len = sizeof(SMqConsumeReq), + .handle = NULL, + }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; @@ -887,6 +1049,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { /*return pRequest;*/ } +#endif #if 0 tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 0aae145d2f..b68bed8789 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -114,7 +114,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg; - /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index d2318009d5..9faabc3874 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -72,7 +72,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { if (pRaw == NULL) goto TOPIC_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER); @@ -121,7 +121,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { int32_t len; int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER); @@ -206,7 +206,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { SName name = {0}; tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - char db[TSDB_TABLE_FNAME_LEN] = {0}; + char db[TSDB_TOPIC_FNAME_LEN] = {0}; tNameGetFullDbName(&name, db); return mndAcquireDb(pMnode, db); @@ -223,7 +223,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq pDrop->head.contLen = htonl(contLen); pDrop->head.vgId = htonl(pVgroup->vgId); - memcpy(pDrop->name, pTopic->name, TSDB_TABLE_FNAME_LEN); + memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN); pDrop->tuid = htobe64(pTopic->uid); return pDrop; @@ -343,6 +343,7 @@ CREATE_TOPIC_OVER: } static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { + // TODO: cannot drop when subscribed STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); @@ -408,76 +409,7 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - STableInfoReq infoReq = {0}; - - if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - mDebug("topic:%s, start to retrieve meta", infoReq.tbName); - #if 0 - SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname); - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("topic:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - STopicObj *pTopic = mndAcquireTopic(pMnode, pInfo->tableFname); - if (pTopic == NULL) { - mndReleaseDb(pMnode, pDb); - terrno = TSDB_CODE_MND_INVALID_TOPIC; - mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - taosRLockLatch(&pTopic->lock); - int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags; - int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); - - STableMetaRsp *pMeta = rpcMallocCont(contLen); - if (pMeta == NULL) { - taosRUnLockLatch(&pTopic->lock); - mndReleaseDb(pMnode, pDb); - mndReleaseTopic(pMnode, pTopic); - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - memcpy(pMeta->topicFname, pTopic->name, TSDB_TABLE_FNAME_LEN); - pMeta->numOfTags = htonl(pTopic->numOfTags); - pMeta->numOfColumns = htonl(pTopic->numOfColumns); - pMeta->precision = pDb->cfg.precision; - pMeta->tableType = TSDB_SUPER_TABLE; - pMeta->update = pDb->cfg.update; - pMeta->sversion = htonl(pTopic->version); - pMeta->tuid = htonl(pTopic->uid); - - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pMeta->pSchemas[i]; - SSchema *pSrcSchema = &pTopic->pSchema[i]; - memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); - pSchema->type = pSrcSchema->type; - pSchema->colId = htonl(pSrcSchema->colId); - pSchema->bytes = htonl(pSrcSchema->bytes); - } - taosRUnLockLatch(&pTopic->lock); - mndReleaseDb(pMnode, pDb); - mndReleaseTopic(pMnode, pTopic); - - pReq->pCont = pMeta; - pReq->contLen = contLen; - - mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags); -#endif - return 0; -} - static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); @@ -504,6 +436,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo mndReleaseDb(pMnode, pDb); return 0; } +#endif static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; @@ -571,7 +504,7 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in if (pTopic->dbUid != pDb->uid) { if (strncmp(pTopic->name, prefix, prefixLen) != 0) { - mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); + mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); } sdbRelease(pSdb, pTopic); @@ -580,8 +513,8 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in cols = 0; - char topicName[TSDB_TABLE_NAME_LEN] = {0}; - tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TABLE_NAME_LEN); + char topicName[TSDB_TOPIC_NAME_LEN] = {0}; + tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_TO_VARSTR(pWrite, topicName); cols++; diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index a516f423bb..d8c9d11ce9 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -52,7 +52,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl void tqClose(STQ*); // required by vnode -int tqPushMsg(STQ*, void* msg, int64_t version); +int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); int tqCommit(STQ*); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3a06674e3c..7c8f97bb8b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -221,7 +221,6 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); - // pHandle->tbUid = tbUid; } return 0; } diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index 344ad992f0..a801b6f7ae 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -79,19 +79,19 @@ extern int32_t tqDebugFlag; // 4096 - 4080 #define TQ_IDX_PAGE_HEAD_SIZE 16 -#define TQ_ACTION_CONST 0 -#define TQ_ACTION_INUSE 1 +#define TQ_ACTION_CONST 0 +#define TQ_ACTION_INUSE 1 #define TQ_ACTION_INUSE_CONT 2 -#define TQ_ACTION_INTXN 3 +#define TQ_ACTION_INTXN 3 #define TQ_SVER 0 // TODO: inplace mode is not implemented #define TQ_UPDATE_INPLACE 0 -#define TQ_UPDATE_APPEND 1 +#define TQ_UPDATE_APPEND 1 #define TQ_DUP_INTXN_REWRITE 0 -#define TQ_DUP_INTXN_REJECT 2 +#define TQ_DUP_INTXN_REJECT 2 static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } @@ -160,7 +160,7 @@ struct STQ { STqMemRef tqMemRef; STqMetaStore* tqMeta; SWal* pWal; - SMeta* pMeta; + SMeta* pVnodeMeta; }; typedef struct { @@ -190,9 +190,6 @@ typedef struct { char* logicalPlan; char* physicalPlan; char* qmsg; - int64_t persistedOffset; - int64_t committedOffset; - int64_t currentOffset; STqBuffer buffer; SWalReadHandle* pReadhandle; } STqTopic; @@ -201,7 +198,7 @@ typedef struct { int64_t consumerId; int64_t epoch; char cgroup[TSDB_TOPIC_FNAME_LEN]; - SArray* topics; // SArray + SArray* topics; // SArray } STqConsumer; int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ac9dde3597..7c8c96fb54 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -42,7 +42,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl pTq->path = strdup(path); pTq->tqConfig = tqConfig; pTq->pWal = pWal; - pTq->pMeta = pMeta; + pTq->pVnodeMeta = pMeta; #if 0 pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); @@ -71,9 +71,11 @@ void tqClose(STQ* pTq) { // TODO } -int tqPushMsg(STQ* pTq, void* p, int64_t version) { - // add reference - // judge and launch new query +int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { + // TODO: add reference + // if handle waiting, launch query and response to consumer + // + // if no waiting handle, return return 0; } @@ -101,9 +103,9 @@ static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/ /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/ tlen += taosEncodeString(buf, pTopic->qmsg); - tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset); - tlen += taosEncodeFixedI64(buf, pTopic->committedOffset); - tlen += taosEncodeFixedI64(buf, pTopic->currentOffset); + /*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/ + /*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/ + /*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/ return tlen; } @@ -113,9 +115,9 @@ static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopi /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/ /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/ buf = taosDecodeString(buf, &pTopic->qmsg); - buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset); - buf = taosDecodeFixedI64(buf, &pTopic->committedOffset); - buf = taosDecodeFixedI64(buf, &pTopic->currentOffset); + /*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/ + /*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/ + /*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/ return buf; } @@ -194,8 +196,8 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu } for (int j = 0; j < TQ_BUFFER_SIZE; j++) { pTopic->buffer.output[j].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); } @@ -243,7 +245,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and - // rsponse to user + // response to user break; } pHead = pTopic->pReadhandle->pHead; @@ -268,7 +270,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { taosArrayPush(pRes, pDataBlock); rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.rspOffset = fetchOffset; - pTopic->currentOffset = fetchOffset; rsp.numOfTopics = 1; rsp.pBlockData = pRes; @@ -312,158 +313,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } -#if 0 -int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) { - SMqConsumeReq* pReq = pMsg->pCont; - int64_t reqId = pReq->reqId; - int64_t consumerId = pReq->consumerId; - int64_t fetchOffset = pReq->offset; - int64_t blockingTime = pReq->blockingTime; - - SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; - - /*printf("vg %d get consume req\n", pReq->head.vgId);*/ - - STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); - if (pConsumer == NULL) { - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = -1; - rpcSendResponse(pMsg); - return 0; - } - int sz = taosArrayGetSize(pConsumer->topics); - - for (int i = 0; i < sz; i++) { - STqTopic* pTopic = taosArrayGet(pConsumer->topics, i); - // TODO: support multiple topic in one req - if (strcmp(pTopic->topicName, pReq->topic) != 0) { - ASSERT(false); - continue; - } - - if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { - pTopic->committedOffset = pReq->offset; - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = 0; - rpcSendResponse(pMsg); - return 0; - } - - if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { - pTopic->committedOffset = pReq->offset - 1; - } - - rsp.committedOffset = pTopic->committedOffset; - rsp.reqOffset = pReq->offset; - rsp.skipLogNum = 0; - - if (fetchOffset <= pTopic->committedOffset) { - fetchOffset = pTopic->committedOffset + 1; - } - /*printf("vg %d fetch Offset %ld\n", pReq->head.vgId, fetchOffset);*/ - int8_t pos; - int8_t skip = 0; - SWalHead* pHead; - while (1) { - pos = fetchOffset % TQ_BUFFER_SIZE; - skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); - if (skip == 1) { - // do nothing - break; - } - if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - printf("read offset %ld\n", fetchOffset); - // check err - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - skip = 1; - break; - } - // read until find TDMT_VND_SUBMIT - pHead = pTopic->pReadhandle->pHead; - if (pHead->head.msgType == TDMT_VND_SUBMIT) { - } - rsp.skipLogNum++; - if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - printf("read offset %ld\n", fetchOffset); - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - skip = 1; - break; - } - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - fetchOffset++; - } - if (skip == 1) continue; - SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; - qTaskInfo_t task = pTopic->buffer.output[pos].task; - - printf("current fetch offset %ld\n", fetchOffset); - qSetStreamInput(task, pCont); - - // SArray - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - while (1) { - SSDataBlock* pDataBlock; - uint64_t ts; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - break; - } - if (pDataBlock != NULL) { - taosArrayPush(pRes, pDataBlock); - } else { - break; - } - } - // TODO copy - rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; - rsp.rspOffset = fetchOffset; - pTopic->currentOffset = fetchOffset; - - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - - if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - fetchOffset++; - continue; - } else { - rsp.numOfTopics++; - } - - rsp.pBlockData = pRes; - -#if 0 - pTopic->buffer.output[pos].dst = pRes; - if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) { - pTopic->buffer.firstOffset = pReq->offset; - } - if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { - pTopic->buffer.lastOffset = pReq->offset; - } -#endif - } - int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - pMsg->code = -1; - return -1; - } - void* abuf = buf; - tEncodeSMqConsumeRsp(&abuf, &rsp); - - if (rsp.pBlockData) { - taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); - rsp.pBlockData = NULL; - } - - pMsg->pCont = buf; - pMsg->contLen = tlen; - pMsg->code = 0; - rpcSendResponse(pMsg); - return 0; -} -#endif - int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; tDecodeSMqMVRebReq(msg, &req); @@ -505,8 +354,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->logicalPlan = req.logicalPlan; pTopic->physicalPlan = req.physicalPlan; pTopic->qmsg = req.qmsg; - pTopic->committedOffset = -1; - pTopic->currentOffset = -1; + /*pTopic->committedOffset = -1;*/ + /*pTopic->currentOffset = -1;*/ pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; @@ -516,8 +365,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { } for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); } diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index c3947da459..81eb09f48f 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // todo: change the interface here int64_t ver; taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); - if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) { + if (tqPushMsg(pVnode->pTq, ptr, pMsg->msgType, ver) < 0) { // TODO: handle error } From 82c141ede82275814d7d2cabf95b25e1f712b73e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 1 Mar 2022 15:56:11 +0800 Subject: [PATCH 02/11] merge from 3.0 --- example/src/tmq.c | 2 +- include/common/tcommon.h | 5 + include/common/tmsg.h | 3 +- source/client/src/tmq.c | 148 ++++++++++++--------- source/dnode/mnode/impl/src/mndSubscribe.c | 12 +- source/dnode/mnode/impl/src/mndTopic.c | 27 ++++ source/dnode/vnode/src/tq/tq.c | 12 +- 7 files changed, 136 insertions(+), 73 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 094fd94bfc..35c3e655d6 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -160,7 +160,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000); if (tmqmessage) { msg_process(tmqmessage); tmq_message_destroy(tmqmessage); diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 37d20cdb97..a04e2afc94 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -49,6 +49,11 @@ enum { TMQ_CONF__RESET_OFFSET__NONE = -3, }; +enum { + TMQ_MSG_TYPE__POLL_RSP = 0, + TMQ_MSG_TYPE__EP_RSP, +}; + typedef struct { uint32_t numOfTables; SArray* pGroupList; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ebd4563f8c..b60f09bdf4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1901,6 +1901,7 @@ struct tmq_message_t { SMqConsumeRsp consumeRsp; SMqCMGetSubEpRsp getEpRsp; }; + void* extra; }; static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } @@ -1955,7 +1956,6 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); - tlen += taosEncodeFixedI32(buf, pRsp->epoch); tlen += taosEncodeString(buf, pRsp->cgroup); int32_t sz = taosArrayGetSize(pRsp->topics); tlen += taosEncodeFixedI32(buf, sz); @@ -1968,7 +1968,6 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { buf = taosDecodeFixedI64(buf, &pRsp->consumerId); - buf = taosDecodeFixedI32(buf, &pRsp->epoch); buf = taosDecodeStringTo(buf, pRsp->cgroup); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f31130acf4..acb96013c9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -69,16 +69,10 @@ struct tmq_t { SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of tmq_message_t STaosQall* qall; - SRWLatch pollLock; // stat int64_t pollCnt; }; -enum { - TMQ_MSG_TYPE__POLL_RSP = 0, - TMQ_MSG_TYPE__EP_RSP, -}; - enum { TMQ_VG_STATUS__IDLE = 0, TMQ_VG_STATUS__WAIT, @@ -123,10 +117,10 @@ typedef struct { } SMqAskEpCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - tmq_message_t* rspMsg; - tsem_t rspSem; + tmq_t* tmq; + SMqClientVg* pVg; + int32_t epoch; + tsem_t rspSem; } SMqPollCbParam; typedef struct { @@ -249,7 +243,6 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - taosInitRWLatch(&pTmq->pollLock); // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -632,31 +625,50 @@ void tmqShowMsg(tmq_message_t* tmq_message) { } int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { + printf("recv poll\n"); SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqClientVg* pVg = pParam->pVg; + tmq_t* tmq = pParam->tmq; if (code != 0) { printf("msg discard\n"); + if (pParam->epoch == tmq->epoch) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } return 0; } - SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp)); - if (pRsp == NULL) { - taosWUnLockLatch(&pParam->tmq->pollLock); - return -1; - } - tDecodeSMqConsumeRsp(pMsg->pData, pRsp); - /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ - if (pRsp->numOfTopics == 0) { - /*printf("no data\n");*/ - free(pRsp); - taosWUnLockLatch(&pParam->tmq->pollLock); + int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; + int32_t tmqEpoch = atomic_load_32(&tmq->epoch); + if (msgEpoch < tmqEpoch) { + printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); return 0; } - pParam->rspMsg = (tmq_message_t*)pRsp; - pVg->currentOffset = pRsp->rspOffset; + + if (msgEpoch != tmqEpoch) { + printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); + } + + /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ + tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); + if (pRsp == NULL) { + return -1; + } + memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); + /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ + if (pRsp->consumeRsp.numOfTopics == 0) { + /*printf("no data\n");*/ + taosFreeQitem(pRsp); + return 0; + } + pRsp->extra = pParam->pVg; + taosWriteQitem(tmq->mqueue, pRsp); + printf("poll in queue\n"); + /*pParam->rspMsg = (tmq_message_t*)pRsp;*/ + /*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/ + /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ /*printf("-----msg begin----\n");*/ - taosWUnLockLatch(&pParam->tmq->pollLock); /*printf("\n-----msg end------\n");*/ return 0; } @@ -715,6 +727,10 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { tDeleteSMqCMGetSubEpRsp(&rsp); } else { tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqCMGetSubEpRsp(pMsg->pData, &pRsp->getEpRsp); taosWriteQitem(tmq->mqueue, pRsp); @@ -723,6 +739,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t tmqAskEp(tmq_t* tmq, bool sync) { + printf("ask ep sync %d\n", sync); int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { @@ -842,6 +859,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { + printf("call poll\n"); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -864,6 +882,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { } param->tmq = tmq; param->pVg = pVg; + param->epoch = tmq->epoch; SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, @@ -877,6 +896,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { sendInfo->fp = tmqPollCb; int64_t transporterId = 0; + printf("send poll\n"); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; tmq->pollCnt++; @@ -885,17 +905,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { return 0; } -void tmqFetchLeftRes(tmq_t* tmq, tmq_message_t** pRspMsg) { - taosGetQitem(tmq->qall, (void**)pRspMsg); - if (pRspMsg == NULL) { - taosReadAllQitems(tmq->mqueue, tmq->qall); - taosGetQitem(tmq->qall, (void**)pRspMsg); - } -} - // return int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { + printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch); if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); tmqClearUnhandleMsg(tmq); @@ -904,12 +917,45 @@ int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { *pReset = false; } } else { - *pReset = false; return -1; } return 0; } +tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { + while (1) { + tmq_message_t* rspMsg = NULL; + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) { + break; + } + + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + printf("handle poll rsp %d\n", rspMsg->head.mqMsgType); + if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) { + printf("epoch match\n"); + SMqClientVg* pVg = rspMsg->extra; + pVg->currentOffset = rspMsg->consumeRsp.rspOffset; + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + return rspMsg; + } else { + printf("epoch mismatch\n"); + taosFreeQitem(rspMsg); + } + } else { + printf("handle ep rsp %d\n", rspMsg->head.mqMsgType); + bool reset = false; + tmqHandleRes(tmq, rspMsg, &reset); + taosFreeQitem(rspMsg); + if (pollIfReset && reset) { + printf("reset and repoll\n"); + tmqPollImpl(tmq, blockingTime); + } + } + } + return NULL; +} + tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* rspMsg = NULL; int64_t startTime = taosGetTimestampMs(); @@ -918,43 +964,21 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { int64_t status = atomic_load_64(&tmq->status); tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); - tmqFetchLeftRes(tmq, &rspMsg); - taosGetQitem(tmq->qall, (void**)&rspMsg); if (rspMsg == NULL) { taosReadAllQitems(tmq->mqueue, tmq->qall); } - - while (1) { - taosGetQitem(tmq->qall, (void**)&rspMsg); - if (rspMsg == NULL) break; - if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - return rspMsg; - } - bool reset = false; - tmqHandleRes(tmq, rspMsg, &reset); - taosFreeQitem(rspMsg); - } + tmqHandleAllRsp(tmq, blocking_time, false); tmqPollImpl(tmq, blocking_time); while (1) { taosReadAllQitems(tmq->mqueue, tmq->qall); - while (1) { - taosGetQitem(tmq->qall, (void**)&rspMsg); - if (rspMsg == NULL) break; - - if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - return rspMsg; - } else { - bool reset = false; - tmqHandleRes(tmq, rspMsg, &reset); - taosFreeQitem(rspMsg); - if (reset) tmqPollImpl(tmq, blocking_time); - } - } + tmqHandleAllRsp(tmq, blocking_time, true); int64_t endTime = taosGetTimestampMs(); if (endTime - startTime > blocking_time) { + printf("cycle end\n"); + usleep(1000 * 1000); return NULL; } } @@ -1092,9 +1116,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; - SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; tDeleteSMqConsumeRsp(pRsp); - free(tmq_message); + taosFreeQitem(tmq_message); } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2ea157fea4..60533b979c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -270,9 +270,8 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { strcpy(rsp.cgroup, pReq->cgroup); rsp.consumerId = consumerId; - rsp.epoch = pConsumer->epoch; - if (epoch != rsp.epoch) { - mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch); + if (epoch != pConsumer->epoch) { + mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch); SArray *pTopics = pConsumer->currentTopics; int32_t sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); @@ -308,13 +307,16 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { mndReleaseSubscribe(pMnode, pSub); } } - int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp); void *buf = rpcMallocCont(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - void *abuf = buf; + ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; + ((SMqRspHead *)buf)->epoch = pConsumer->epoch; + + void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tDeleteSMqCMGetSubEpRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 9faabc3874..9822550ee5 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -438,6 +438,33 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo } #endif +static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { + SSdb *pSdb = pMnode->pSdb; + SDbObj *pDb = mndAcquireDb(pMnode, dbName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + int32_t numOfTopics = 0; + void *pIter = NULL; + while (1) { + SMqTopicObj *pTopic = NULL; + pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); + if (pIter == NULL) break; + + if (pTopic->dbUid == pDb->uid) { + numOfTopics++; + } + + sdbRelease(pSdb, pTopic); + } + + *pNumOfTopics = numOfTopics; + mndReleaseDb(pMnode, pDb); + return 0; +} + static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7c8c96fb54..5f5a434ec9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -220,7 +220,11 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { fetchOffset = pReq->currentOffset + 1; } - SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; + SMqConsumeRsp rsp = { + .consumerId = consumerId, + .numOfTopics = 0, + .pBlockData = NULL, + }; STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { @@ -296,14 +300,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } } - int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; return -1; } + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; - void* abuf = buf; + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqConsumeRsp(&abuf, &rsp); rsp.pBlockData = NULL; pMsg->pCont = buf; From a536f924f9ebdf4b4c11acc9e8035c2e26d6c1f2 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Tue, 1 Mar 2022 16:35:54 +0800 Subject: [PATCH 03/11] [TD-13756]: file system only read error. --- source/os/src/osFile.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index fbb0e75257..2d77df9b43 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -198,6 +198,8 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { mode = (tdFileOptions & TD_FILE_TEXT) ? "at+" : "ab+"; }else if (tdFileOptions & TD_FILE_TRUNC) { mode = (tdFileOptions & TD_FILE_TEXT) ? "wt+" : "wb+"; + }else if ((tdFileOptions & TD_FILE_READ) && !(tdFileOptions & TD_FILE_WRITE)) { + mode = (tdFileOptions & TD_FILE_TEXT) ? "rt" : "rb"; }else { mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+"; } From b5f5400d30a88acd46170f5125d170a67a29e2bc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 1 Mar 2022 17:49:14 +0800 Subject: [PATCH 04/11] fix --- source/client/src/clientHb.c | 166 ++++++++++++++++----------------- source/client/src/tmq.c | 27 ++++-- source/dnode/vnode/src/tq/tq.c | 6 +- source/os/src/osFile.c | 2 +- 4 files changed, 104 insertions(+), 97 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 45c1858948..0e309a8631 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -13,19 +13,17 @@ * along with this program. If not, see . */ -#include "clientInt.h" -#include "trpc.h" #include "catalog.h" +#include "clientInt.h" #include "clientLog.h" +#include "trpc.h" static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { - return 0; -} +static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; @@ -39,8 +37,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); - tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); - + tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->uid); + if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); } else { @@ -60,8 +58,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog taosHashCleanup(vgInfo.vgHash); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - } - + } + catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo); } @@ -106,8 +104,8 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_SUCCESS; } -static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { - SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); +static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { + SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); return TSDB_CODE_SUCCESS; @@ -116,7 +114,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; tscDebug("hb got %d rsp kv", kvNum); - + for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pRsp->info, i); switch (kv->key) { @@ -126,30 +124,30 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs break; } - int64_t *clusterId = (int64_t *)info->param; + int64_t *clusterId = (int64_t *)info->param; struct SCatalog *pCatalog = NULL; - + int32_t code = catalogGetHandle(*clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); break; } hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); break; } - case HEARTBEAT_KEY_STBINFO:{ + case HEARTBEAT_KEY_STBINFO: { if (kv->valueLen <= 0 || NULL == kv->value) { tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value); break; } - int64_t *clusterId = (int64_t *)info->param; + int64_t *clusterId = (int64_t *)info->param; struct SCatalog *pCatalog = NULL; - + int32_t code = catalogGetHandle(*clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); break; } @@ -165,22 +163,22 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs return TSDB_CODE_SUCCESS; } -static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t hbMqAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { static int32_t emptyRspNum = 0; if (code != 0) { tfree(param); return -1; } - char *key = (char *)param; + char *key = (char *)param; SClientHbBatchRsp pRsp = {0}; tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); - + int32_t rspNum = taosArrayGetSize(pRsp.rsps); - SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); + SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL || NULL == *pInst) { - tscError("cluster not exist, key:%s", key); + tscError("cluster not exist, key:%s", key); tfree(param); tFreeClientHbBatchRsp(&pRsp); return -1; @@ -189,13 +187,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code tfree(param); if (rspNum) { - tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); + tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, + atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); } else { atomic_add_fetch_32(&emptyRspNum, 1); } for (int32_t i = 0; i < rspNum; ++i) { - SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i); + SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i); code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); if (code) { break; @@ -203,14 +202,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code } tFreeClientHbBatchRsp(&pRsp); - + return code; } int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SDbVgVersion *dbs = NULL; - uint32_t dbNum = 0; - int32_t code = 0; + uint32_t dbNum = 0; + int32_t code = 0; code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum); if (TSDB_CODE_SUCCESS != code) { @@ -238,8 +237,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SSTableMetaVersion *stbs = NULL; - uint32_t stbNum = 0; - int32_t code = 0; + uint32_t stbNum = 0; + int32_t code = 0; code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum); if (TSDB_CODE_SUCCESS != code) { @@ -254,7 +253,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC SSTableMetaVersion *stb = &stbs[i]; stb->suid = htobe64(stb->suid); stb->sversion = htons(stb->sversion); - stb->tversion = htons(stb->tversion); + stb->tversion = htons(stb->tversion); } SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs}; @@ -266,17 +265,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC return TSDB_CODE_SUCCESS; } - -int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { - int64_t *clusterId = (int64_t *)param; +int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { + int64_t *clusterId = (int64_t *)param; struct SCatalog *pCatalog = NULL; int32_t code = catalogGetHandle(*clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); return code; } - + code = hbGetExpiredDBInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { return code; @@ -287,13 +285,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req return code; } - return TSDB_CODE_SUCCESS; } -int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { - -} +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } void hbMgrInitMqHbHandle() { clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; @@ -312,10 +307,8 @@ void hbFreeReq(void *req) { tFreeReqKvHash(pReq->info); } - - -SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { - SClientHbBatchReq* pBatchReq = calloc(1, sizeof(SClientHbBatchReq)); +SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { + SClientHbBatchReq *pBatchReq = calloc(1, sizeof(SClientHbBatchReq)); if (pBatchReq == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; @@ -324,11 +317,11 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); int32_t code = 0; - void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); + void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { - SClientHbReq* pOneReq = pIter; + SClientHbReq *pOneReq = pIter; - SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); + SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); if (info) { code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); if (code) { @@ -350,11 +343,10 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { return pBatchReq; } - void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { - SClientHbReq* pOneReq = pIter; + SClientHbReq *pOneReq = pIter; tFreeReqKvHash(pOneReq->info); taosHashClear(pOneReq->info); @@ -363,31 +355,29 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { } } - - -static void* hbThreadFunc(void* param) { +static void *hbThreadFunc(void *param) { setThreadName("hb"); while (1) { int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2); - if(1 == threadStop) { + if (1 == threadStop) { break; } pthread_mutex_lock(&clientHbMgr.lock); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); - for(int i = 0; i < sz; i++) { - SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); + for (int i = 0; i < sz; i++) { + SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); if (connCnt == 0) { continue; } - SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr); + SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr); if (pReq == NULL) { continue; } - int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); + int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); void *buf = malloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -395,7 +385,7 @@ static void* hbThreadFunc(void* param) { hbClearReqInfo(pAppHbMgr); break; } - + tSerializeSClientHbBatchReq(buf, tlen, pReq); SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo)); @@ -415,17 +405,17 @@ static void* hbThreadFunc(void* param) { pInfo->requestObjRefId = 0; SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; - int64_t transporterId = 0; - SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); + int64_t transporterId = 0; + SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); - tFreeClientHbBatchReq(pReq, false); + tFreeClientHbBatchReq(pReq, false); hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } pthread_mutex_unlock(&clientHbMgr.lock); - + taosMsleep(HEARTBEAT_INTERVAL); } return NULL; @@ -449,17 +439,18 @@ static void hbStopThread() { tscDebug("hb thread already stopped"); return; } - + while (2 != atomic_load_8(&clientHbMgr.threadStop)) { usleep(10); } - tscDebug("hb thread stopped"); + tscDebug("hb thread stopped"); } -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { +SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { + /*return NULL;*/ hbMgrInit(); - SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); + SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -495,7 +486,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { pthread_mutex_lock(&clientHbMgr.lock); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); pthread_mutex_unlock(&clientHbMgr.lock); - + return pAppHbMgr; } @@ -504,7 +495,7 @@ void appHbMgrCleanup(void) { int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); for (int i = 0; i < sz; i++) { - SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); + SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); taosHashCleanup(pTarget->activeInfo); pTarget->activeInfo = NULL; taosHashCleanup(pTarget->connInfo); @@ -515,11 +506,12 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { + /*return 0;*/ // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; - clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*)); + clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *)); pthread_mutex_init(&clientHbMgr.lock, NULL); // init handle funcs @@ -532,36 +524,36 @@ int hbMgrInit() { } void hbMgrCleanUp() { - return; + /*return;*/ hbStopThread(); - + // destroy all appHbMgr int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); if (old == 0) return; pthread_mutex_lock(&clientHbMgr.lock); appHbMgrCleanup(); - taosArrayDestroy(clientHbMgr.appHbMgrs); + taosArrayDestroy(clientHbMgr.appHbMgrs); pthread_mutex_unlock(&clientHbMgr.lock); - + clientHbMgr.appHbMgrs = NULL; } -int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { +int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { // init hash in activeinfo - void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { return 0; } SClientHbReq hbReq; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - + taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); - + // init hash if (info != NULL) { - SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); info->req = pReq; taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo)); } @@ -570,9 +562,10 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo * return 0; } -int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { +int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { + /*return 0;*/ SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; - SHbConnInfo info = {0}; + SHbConnInfo info = {0}; switch (hbType) { case HEARTBEAT_TYPE_QUERY: { @@ -588,11 +581,12 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3 default: break; } - + return hbRegisterConnImpl(pAppHbMgr, connKey, &info); } -void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { +void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { + /*return;*/ int32_t code = 0; code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); @@ -602,9 +596,11 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } -int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { +int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen, + int32_t valueLen) { + return 0; // find req by connection id - SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); ASSERT(pReq != NULL); taosHashPut(pReq->info, key, keyLen, value, valueLen); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index acb96013c9..d9ab23b9fa 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -134,7 +134,7 @@ typedef struct { tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); conf->auto_commit = false; - conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST; + conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; return conf; } @@ -651,13 +651,17 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { + printf("fail\n"); return -1; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { - /*printf("no data\n");*/ + printf("no data\n"); + if (pParam->epoch == tmq->epoch) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } taosFreeQitem(pRsp); return 0; } @@ -732,7 +736,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { return -1; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqCMGetSubEpRsp(pMsg->pData, &pRsp->getEpRsp); + tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp); taosWriteQitem(tmq->mqueue, pRsp); } return 0; @@ -973,13 +977,18 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmqPollImpl(tmq, blocking_time); while (1) { + /*printf("cycle\n");*/ taosReadAllQitems(tmq->mqueue, tmq->qall); - tmqHandleAllRsp(tmq, blocking_time, true); - int64_t endTime = taosGetTimestampMs(); - if (endTime - startTime > blocking_time) { - printf("cycle end\n"); - usleep(1000 * 1000); - return NULL; + rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); + if (rspMsg) { + return rspMsg; + } + if (blocking_time != 0) { + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + printf("normal exit\n"); + return NULL; + } } } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5f5a434ec9..aa198d0806 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -278,14 +278,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { rsp.numOfTopics = 1; rsp.pBlockData = pRes; - int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; return -1; } + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; - void* abuf = buf; + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqConsumeRsp(&abuf, &rsp); taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); pMsg->pCont = buf; diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 2d77df9b43..751bbdbb09 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -682,4 +682,4 @@ int32_t taosEOFFile(TdFilePtr pFile) { assert(pFile->fp != NULL); return feof(pFile->fp); -} \ No newline at end of file +} From 4d0b8966705b973ae9be8487491079f34b3f924d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 1 Mar 2022 17:58:02 +0800 Subject: [PATCH 05/11] merge from 3.0 --- source/client/src/clientHb.c | 12 ++++++------ source/client/src/tmq.c | 12 ++++++++++++ source/os/src/osFile.c | 14 +++++++++----- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0e309a8631..dcb30a6576 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -288,7 +288,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return TSDB_CODE_SUCCESS; } -int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {} void hbMgrInitMqHbHandle() { clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; @@ -448,7 +448,7 @@ static void hbStopThread() { } SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { - /*return NULL;*/ + return NULL; hbMgrInit(); SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -506,7 +506,7 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { - /*return 0;*/ + return 0; // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -524,7 +524,7 @@ int hbMgrInit() { } void hbMgrCleanUp() { - /*return;*/ + return; hbStopThread(); // destroy all appHbMgr @@ -563,7 +563,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * } int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { - /*return 0;*/ + return 0; SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; SHbConnInfo info = {0}; @@ -586,7 +586,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3 } void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { - /*return;*/ + return; int32_t code = 0; code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index d9ab23b9fa..4d740bcae8 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -659,9 +659,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { printf("no data\n"); +<<<<<<< Updated upstream if (pParam->epoch == tmq->epoch) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } +======= +>>>>>>> Stashed changes taosFreeQitem(pRsp); return 0; } @@ -979,6 +982,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { while (1) { /*printf("cycle\n");*/ taosReadAllQitems(tmq->mqueue, tmq->qall); +<<<<<<< Updated upstream rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); if (rspMsg) { return rspMsg; @@ -990,6 +994,14 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { return NULL; } } +======= + tmqHandleAllRsp(tmq, blocking_time, true); + /*if (blocking_time != 0 && endTime - startTime > blocking_time) {*/ + /*int64_t endTime = taosGetTimestampMs();*/ + /*printf("normal exit\n");*/ + /*return NULL;*/ + /*}*/ +>>>>>>> Stashed changes } } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 751bbdbb09..aaabf40e05 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -196,11 +196,11 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { char *mode = NULL; if (tdFileOptions & TD_FILE_APPEND) { mode = (tdFileOptions & TD_FILE_TEXT) ? "at+" : "ab+"; - }else if (tdFileOptions & TD_FILE_TRUNC) { + } else if (tdFileOptions & TD_FILE_TRUNC) { mode = (tdFileOptions & TD_FILE_TEXT) ? "wt+" : "wb+"; - }else if ((tdFileOptions & TD_FILE_READ) && !(tdFileOptions & TD_FILE_WRITE)) { + } else if ((tdFileOptions & TD_FILE_READ) && !(tdFileOptions & TD_FILE_WRITE)) { mode = (tdFileOptions & TD_FILE_TEXT) ? "rt" : "rb"; - }else { + } else { mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+"; } assert(!(tdFileOptions & TD_FILE_EXCL)); @@ -637,7 +637,7 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) { } assert(pFile->fp != NULL); - char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0}; + char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0}; va_list ap; va_start(ap, format); vfprintf(pFile->fp, format, ap); @@ -675,11 +675,15 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict__ ptrBuf) { size_t len = 0; return getline(ptrBuf, &len, pFile->fp); } -int32_t taosEOFFile(TdFilePtr pFile) { +int32_t taosEOFFile(TdFilePtr pFile) { if (pFile == NULL) { return 0; } assert(pFile->fp != NULL); +<<<<<<< Updated upstream return feof(pFile->fp); +======= + return feof(pFile->fp); +>>>>>>> Stashed changes } From a68153ae9275d90af2d81aa5b7128f74ef7328d9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 1 Mar 2022 18:00:50 +0800 Subject: [PATCH 06/11] merge from 3.0 --- source/client/src/tmq.c | 12 ------------ source/os/src/osFile.c | 4 ---- 2 files changed, 16 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 4d740bcae8..d9ab23b9fa 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -659,12 +659,9 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { printf("no data\n"); -<<<<<<< Updated upstream if (pParam->epoch == tmq->epoch) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } -======= ->>>>>>> Stashed changes taosFreeQitem(pRsp); return 0; } @@ -982,7 +979,6 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { while (1) { /*printf("cycle\n");*/ taosReadAllQitems(tmq->mqueue, tmq->qall); -<<<<<<< Updated upstream rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); if (rspMsg) { return rspMsg; @@ -994,14 +990,6 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { return NULL; } } -======= - tmqHandleAllRsp(tmq, blocking_time, true); - /*if (blocking_time != 0 && endTime - startTime > blocking_time) {*/ - /*int64_t endTime = taosGetTimestampMs();*/ - /*printf("normal exit\n");*/ - /*return NULL;*/ - /*}*/ ->>>>>>> Stashed changes } } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index aaabf40e05..136afb9a15 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -681,9 +681,5 @@ int32_t taosEOFFile(TdFilePtr pFile) { } assert(pFile->fp != NULL); -<<<<<<< Updated upstream - return feof(pFile->fp); -======= return feof(pFile->fp); ->>>>>>> Stashed changes } From 306cd7bef392238400fb79e8be3b0da52e6be653 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 1 Mar 2022 18:09:43 +0800 Subject: [PATCH 07/11] add hb back --- source/client/src/clientHb.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index dcb30a6576..c7e329c6e6 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -448,7 +448,7 @@ static void hbStopThread() { } SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { - return NULL; + /*return NULL;*/ hbMgrInit(); SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -506,7 +506,7 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { - return 0; + /*return 0;*/ // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -524,7 +524,7 @@ int hbMgrInit() { } void hbMgrCleanUp() { - return; + /*return;*/ hbStopThread(); // destroy all appHbMgr @@ -563,7 +563,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * } int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { - return 0; + /*return 0;*/ SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; SHbConnInfo info = {0}; @@ -586,7 +586,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3 } void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { - return; + /*return;*/ int32_t code = 0; code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); From 6a6bab9e9865284e9ca06b1862fac3b985729da8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 1 Mar 2022 19:17:35 +0800 Subject: [PATCH 08/11] fix hb crash --- source/common/src/tmsg.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8a3cd0a718..34b2932f9a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -270,7 +270,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR int32_t rspNum = 0; if (tDecodeI32(&decoder, &rspNum) < 0) return -1; if (pBatchRsp->rsps == NULL) { - pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbReq)); + pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbRsp)); } for (int32_t i = 0; i < rspNum; i++) { SClientHbRsp rsp = {0}; @@ -1527,7 +1527,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { if (pRsp->vgNum <= 0) { return 0; } - + pRsp->pVgroupInfos = taosArrayInit(pRsp->vgNum, sizeof(SVgroupInfo)); if (pRsp->pVgroupInfos == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; From a7533ac2b26c941bb35e2fcda0c6cea069594ef9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 1 Mar 2022 19:31:07 +0800 Subject: [PATCH 09/11] add hb back --- source/client/src/clientHb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index c7e329c6e6..e9e64a78b8 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -524,7 +524,7 @@ int hbMgrInit() { } void hbMgrCleanUp() { - /*return;*/ + return; hbStopThread(); // destroy all appHbMgr From e697f6301356c9b5f402b8805ce222345f4516c6 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Tue, 1 Mar 2022 19:34:24 +0800 Subject: [PATCH 10/11] [TD-13736]: console exit input error. --- tools/shell/src/shellMain.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index 8a1763c4fc..f62c43773d 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -41,9 +41,11 @@ void *cancelHandler(void *arg) { taosReleaseRef(tscObjRef, rid); #endif #else + reset_terminal_mode(); printf("\nReceive ctrl+c or other signal, quit shell.\n"); exit(0); #endif + reset_terminal_mode(); printf("\nReceive ctrl+c or other signal, quit shell.\n"); exit(0); } From 539f1238ba777b086e25d39987b1e680a901d49a Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Tue, 1 Mar 2022 22:11:38 +0800 Subject: [PATCH 11/11] [TD-13736]: console exit input error. --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index db84f87b66..00cf770e6b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -271,7 +271,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddTimezone(pCfg, "timezone", tsTimezone) != 0) return -1; if (cfgAddLocale(pCfg, "locale", tsLocale) != 0) return -1; if (cfgAddCharset(pCfg, "charset", tsCharset) != 0) return -1; - if (cfgAddBool(pCfg, "enableCoreFile", 0, 1) != 0) return -1; + if (cfgAddBool(pCfg, "enableCoreFile", 1, 1) != 0) return -1; if (cfgAddInt32(pCfg, "numOfCores", tsNumOfCores, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "pageSize(KB)", tsPageSize, 0, INT64_MAX, 1) != 0) return -1; if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, 1) != 0) return -1;