diff --git a/source/client/consumer/consumer.c b/source/client/consumer/consumer.c deleted file mode 100644 index 4ba1f95144..0000000000 --- a/source/client/consumer/consumer.c +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "consumer.h" diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f7593595b0..1fa267ae7e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -241,6 +241,10 @@ void tmq_list_destroy(tmq_list_t* list) { taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree); } +static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { + return sprintf(dst, "%s:%d", topicName, vg); +} + void tmqClearUnhandleMsg(tmq_t* tmq) { tmq_message_t* msg = NULL; while (1) { @@ -827,7 +831,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqClientVg* pVg = pParam->pVg; tmq_t* tmq = pParam->tmq; if (code != 0) { - printf("msg discard, code:%x\n", code); + tscWarn("msg discard, code:%x", code); goto WRITE_QUEUE_FAIL; } @@ -835,12 +839,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqEpoch = atomic_load_32(&tmq->epoch); if (msgEpoch < tmqEpoch) { tsem_post(&tmq->rspSem); - printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); + tscWarn("discard rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch); return 0; } if (msgEpoch != tmqEpoch) { - printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); + tscWarn("mismatch rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch); } else { atomic_sub_fetch_32(&tmq->waitingRequest, 1); } @@ -899,19 +903,54 @@ WRITE_QUEUE_FAIL: bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { /*printf("call update ep %d\n", epoch);*/ bool set = false; - int32_t sz = taosArrayGetSize(pRsp->topics); - SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); - for (int32_t i = 0; i < sz; i++) { + int32_t topicNumGet = taosArrayGetSize(pRsp->topics); + char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); + if (newTopics == NULL) { + return false; + } + SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); + if (pHash == NULL) { + taosArrayDestroy(newTopics); + return false; + } + + // find topic, build hash + for (int32_t i = 0; i < topicNumGet; i++) { SMqClientTopic topic = {0}; SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); + taosHashClear(pHash); topic.topicName = strdup(pTopicEp->topic); - int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); - topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); - for (int32_t j = 0; j < vgSz; j++) { + + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + for (int32_t j = 0; j < topicNumCur; j++) { + // find old topic + SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); + if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) { + int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); + if (vgNumCur == 0) break; + for (int32_t k = 0; k < vgNumCur; k++) { + SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k); + sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId); + taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); + } + break; + } + } + + int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); + topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgNumGet; j++) { SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); + int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); + int64_t offset = pVgEp->offset; + if (pOffset != NULL) { + offset = *pOffset; + } SMqClientVg clientVg = { .pollCnt = 0, - .currentOffset = pVgEp->offset, + .currentOffset = offset, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, .vgStatus = TMQ_VG_STATUS__IDLE, @@ -922,6 +961,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { taosArrayPush(newTopics, &topic); } if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); + taosHashCleanup(pHash); tmq->clientTopics = newTopics; atomic_store_32(&tmq->epoch, epoch); return set; @@ -931,7 +971,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; if (code != 0) { - printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync); + tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync); goto END; } @@ -1302,6 +1342,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { while (1) { /*printf("cycle\n");*/ + tmqAskEp(tmq, false); tmqPollImpl(tmq, blocking_time); tsem_wait(&tmq->rspSem); diff --git a/source/dnode/mgmt/vm/src/vmWorker.c b/source/dnode/mgmt/vm/src/vmWorker.c index 193807317f..9d62624756 100644 --- a/source/dnode/mgmt/vm/src/vmWorker.c +++ b/source/dnode/mgmt/vm/src/vmWorker.c @@ -76,7 +76,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SVnodeObj *pVnode = pInfo->ahandle; dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); - int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); + int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo); if (code != 0) { vmSendRsp(pVnode->pWrapper, pMsg, code); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); @@ -168,7 +168,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO taosGetQitem(qall, (void **)&pMsg); dTrace("msg:%p, will be processed in vnode-merge queue", pMsg); - int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); + int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo); if (code != 0) { vmSendRsp(pVnode->pWrapper, pMsg, code); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); @@ -414,8 +414,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = maxMergeThreads; if (tWWorkerInit(pWPool) != 0) return -1; - SSingleWorkerCfg cfg = { - .min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index c0f591d1f2..13ccc912d6 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -26,7 +26,7 @@ #include "parser.h" #include "tname.h" -#define MND_TOPIC_VER_NUMBER 1 +#define MND_TOPIC_VER_NUMBER 1 #define MND_TOPIC_RESERVE_SIZE 64 static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index dd4b8d84ca..c1c000295a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -17,8 +17,9 @@ #define _TD_VNODE_H_ #include "os.h" -#include "trpc.h" #include "tmsgcb.h" +#include "tqueue.h" +#include "trpc.h" #include "meta.h" #include "tarray.h" @@ -166,7 +167,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); * @param pMsg The request message * @return int 0 for success, -1 for failure */ -int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg); +int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); /* ------------------------ SVnodeCfg ------------------------ */ /** @@ -185,7 +186,6 @@ void vnodeOptionsClear(SVnodeCfg *pOptions); int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); - /* ------------------------ FOR COMPILE ------------------------ */ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 5ec5b1d58f..e5d1f952a8 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -197,9 +197,9 @@ int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); -int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); +int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fce423d811..06a2350aab 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -356,7 +356,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; - ASSERT(0); return -1; } ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; @@ -490,7 +489,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { return 0; } -int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { +int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId) { void* pIter = NULL; while (1) { @@ -498,14 +497,14 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { if (pIter == NULL) break; SStreamTask* pTask = (SStreamTask*)pIter; - if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, 0) < 0) { + if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, workerId) < 0) { // TODO } } return 0; } -int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) { SStreamTaskExecReq req; tDecodeSStreamTaskExecReq(msg, &req); @@ -515,7 +514,7 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) { SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); ASSERT(pTask); - if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) { + if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, workerId) < 0) { // TODO } return 0; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 3d9d9a90dd..9c0b0802ab 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -167,8 +167,10 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { break; } - // TODO handle null - colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL); + if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { + taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); + return NULL; + } } curRow++; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 8ebdad48f7..598647f797 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -41,7 +41,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { } } -int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { +int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -69,9 +69,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_MERGE_EXEC: - return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen); + return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, pInfo->workerId); case TDMT_VND_STREAM_TRIGGER: - return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); + return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, pInfo->workerId); case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index c220e6001f..8875090b43 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -184,17 +184,18 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } } break; case TDMT_VND_TASK_WRITE_EXEC: { - if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead), + 0) < 0) { } } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA #if 1 - + SSmaCfg vCreateSmaReq = {0}; if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - vWarn("vgId%d: TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId, terrstr(terrno)); + vWarn("vgId%d: TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId, + terrstr(terrno)); return -1; } vWarn("vgId%d: TDMT_VND_CREATE_SMA received for %s:%" PRIi64, pVnode->config.vgId, vCreateSmaReq.tSma.indexName, diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 182d40c96a..d339166d74 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -387,11 +387,12 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog printf("subscribe err\n"); return; } + /*taosSsleep(3);*/ int32_t batchCnt = 0; int32_t skipLogNum = 0; int64_t startTime = taosGetTimestampUs(); while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 3000); if (tmqmessage) { batchCnt++; skipLogNum += tmqGetSkipLogNum(tmqmessage);