From 3f12156c537d968005966b4571036cfbc07caf9b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Apr 2023 13:56:42 +0800 Subject: [PATCH] enh(tmq): support seek in tmq. --- source/client/src/clientTmq.c | 183 +++++++++++++------------- source/dnode/vnode/src/tq/tqRestore.c | 21 --- 2 files changed, 90 insertions(+), 114 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e6dc9881fe..d6ad4c3db9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -560,6 +560,21 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN return TSDB_CODE_SUCCESS; } +static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { + int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); + for (int32_t i = 0; i < numOfTopics; ++i) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + if (strcmp(pTopic->topicName, pTopicName) != 0) { + continue; + } + + return pTopic; + } + + tscError("consumer:0x%" PRIx64 ", failed to find topic:%s", tmq->consumerId, pTopicName); + return NULL; +} + static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) { char* pTopicName = NULL; int32_t vgId = 0; @@ -602,15 +617,8 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); - int32_t i = 0; - for (; i < numOfTopics; i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (strcmp(pTopic->topicName, pTopicName) == 0) { - break; - } - } - - if (i == numOfTopics) { + SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); + if (pTopic == NULL) { tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, pTopicName, numOfTopics); taosMemoryFree(pParamSet); @@ -618,8 +626,6 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p return; } - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - int32_t j = 0; int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); for (j = 0; j < numOfVgroups; j++) { @@ -2334,20 +2340,7 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { return NULL; } -static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { - int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - for (int32_t i = 0; i < numOfTopics; ++i) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (strcmp(pTopic->topicName, pTopicName) != 0) { - continue; - } - return pTopic; - } - - tscError("consumer:0x%" PRIx64 ", failed to find topic:%s", tmq->consumerId, pTopicName); - return NULL; -} int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, int32_t* numOfAssignment) { @@ -2408,74 +2401,78 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle return TSDB_CODE_INVALID_PARA; } - return 0; -#if 0 - // tmq_commit_sync(tmq, ); - { - SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); - if (pParamSet == NULL) { -// pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); - return -1; - } - - pParamSet->refId = tmq->refId; - pParamSet->epoch = tmq->epoch; - pParamSet->callbackFn = pCommitFp; - pParamSet->userParam = userParam; - - int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - - tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); - - int32_t i = 0; - for (; i < numOfTopics; i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (strcmp(pTopic->topicName, pTopicName) == 0) { - break; - } - } - - if (i == numOfTopics) { - tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, - pTopicName, numOfTopics); - taosMemoryFree(pParamSet); - pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); - return; - } - - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - - int32_t j = 0; - int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - for (j = 0; j < numOfVgroups; j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg->vgId == vgId) { - break; - } - } - - if (j == numOfVgroups) { - tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, - vgId, numOfVgroups, pTopicName); - taosMemoryFree(pParamSet); - pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); - return; - } - - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) { - code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups); - - // failed to commit, callback user function directly. - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pParamSet); - pCommitFp(tmq, code, userParam); - } - } else { // do not perform commit, callback user function directly. - taosMemoryFree(pParamSet); - pCommitFp(tmq, code, userParam); - } - } -#endif +// return 0; +//#if 0 + SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; + tstrncpy(rspObj.topic, pTopicName, tListLen(rspObj.topic)); + tmq_commit_sync(tmq, &rspObj); + // { + // SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); + // if (pParamSet == NULL) { + //// pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); + // return -1; + // } + // + // pParamSet->refId = tmq->refId; + // pParamSet->epoch = tmq->epoch; + // pParamSet->callbackFn = pCommitFp; + // pParamSet->userParam = userParam; + // + // int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); + // + // tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); + // + // int32_t i = 0; + // for (; i < numOfTopics; i++) { + // SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + // if (strcmp(pTopic->topicName, pTopicName) == 0) { + // break; + // } + // } + // + // if (i == numOfTopics) { + // tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, + // pTopicName, numOfTopics); + // taosMemoryFree(pParamSet); + // pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + // return; + // } + // + // SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + // + // int32_t j = 0; + // int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); + // for (j = 0; j < numOfVgroups; j++) { + // SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + // if (pVg->vgId == vgId) { + // break; + // } + // } + // + // if (j == numOfVgroups) { + // tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", + // tmq->consumerId, + // vgId, numOfVgroups, pTopicName); + // taosMemoryFree(pParamSet); + // pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + // return; + // } + // + // SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + // if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, + // &pVg->offsetInfo.committedOffset)) { + // code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups); + // + // // failed to commit, callback user function directly. + // if (code != TSDB_CODE_SUCCESS) { + // taosMemoryFree(pParamSet); + // pCommitFp(tmq, code, userParam); + // } + // } else { // do not perform commit, callback user function directly. + // taosMemoryFree(pParamSet); + // pCommitFp(tmq, code, userParam); + // } + // } + //#endif } \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index cba51cdee4..54fcc04b62 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -55,27 +55,6 @@ int tqStreamTasksScanWal(STQ* pTq) { return 0; } -//int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { -// int32_t numOfTask = taosArrayGetSize(pTaskList); -// if (numOfTask <= 0) { -// return TSDB_CODE_SUCCESS; -// } -// -// // todo: add lock -// for (int32_t i = 0; i < numOfTask; ++i) { -// SStreamTask* pTask = taosArrayGetP(pTaskList, i); -// tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64, -// pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id); -// taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); -// -// // NOTE: do not change the following order -// atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); -// taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); -// } -// -// return TSDB_CODE_SUCCESS; -//} - int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) { void* pIter = NULL; int32_t vgId = pStreamMeta->vgId;