Merge pull request #11118 from taosdata/feature/tq

Feature/tq
This commit is contained in:
Liu Jicong 2022-03-30 14:38:16 +08:00 committed by GitHub
commit f65908a095
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 79 additions and 52 deletions

View File

@ -1,16 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "consumer.h"

View File

@ -241,6 +241,10 @@ void tmq_list_destroy(tmq_list_t* list) {
taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
}
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg);
}
void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg = NULL;
while (1) {
@ -827,7 +831,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqClientVg* pVg = pParam->pVg;
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("msg discard, code:%x\n", code);
tscWarn("msg discard, code:%x", code);
goto WRITE_QUEUE_FAIL;
}
@ -835,12 +839,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
tsem_post(&tmq->rspSem);
printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
tscWarn("discard rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch);
return 0;
}
if (msgEpoch != tmqEpoch) {
printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);
tscWarn("mismatch rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch);
} else {
atomic_sub_fetch_32(&tmq->waitingRequest, 1);
}
@ -899,19 +903,54 @@ WRITE_QUEUE_FAIL:
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/
bool set = false;
int32_t sz = taosArrayGetSize(pRsp->topics);
SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
for (int32_t i = 0; i < sz; i++) {
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) {
return false;
}
SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
if (pHash == NULL) {
taosArrayDestroy(newTopics);
return false;
}
// find topic, build hash
for (int32_t i = 0; i < topicNumGet; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
taosHashClear(pHash);
topic.topicName = strdup(pTopicEp->topic);
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgSz; j++) {
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) {
// find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
if (vgNumCur == 0) break;
for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
}
break;
}
}
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgNumGet; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset;
if (pOffset != NULL) {
offset = *pOffset;
}
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = pVgEp->offset,
.currentOffset = offset,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
@ -922,6 +961,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
taosArrayPush(newTopics, &topic);
}
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
taosHashCleanup(pHash);
tmq->clientTopics = newTopics;
atomic_store_32(&tmq->epoch, epoch);
return set;
@ -931,7 +971,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
goto END;
}
@ -1302,6 +1342,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
while (1) {
/*printf("cycle\n");*/
tmqAskEp(tmq, false);
tmqPollImpl(tmq, blocking_time);
tsem_wait(&tmq->rspSem);

View File

@ -76,7 +76,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
@ -168,7 +168,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
@ -414,8 +414,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
pWPool->max = maxMergeThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
SSingleWorkerCfg cfg = {
.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
dError("failed to start vnode-mgmt worker since %s", terrstr());
return -1;

View File

@ -26,7 +26,7 @@
#include "parser.h"
#include "tname.h"
#define MND_TOPIC_VER_NUMBER 1
#define MND_TOPIC_VER_NUMBER 1
#define MND_TOPIC_RESERVE_SIZE 64
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);

View File

@ -17,8 +17,9 @@
#define _TD_VNODE_H_
#include "os.h"
#include "trpc.h"
#include "tmsgcb.h"
#include "tqueue.h"
#include "trpc.h"
#include "meta.h"
#include "tarray.h"
@ -166,7 +167,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
* @param pMsg The request message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
/* ------------------------ SVnodeCfg ------------------------ */
/**
@ -185,7 +186,6 @@ void vnodeOptionsClear(SVnodeCfg *pOptions);
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
/* ------------------------ FOR COMPILE ------------------------ */
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);

View File

@ -197,9 +197,9 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
// sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);

View File

@ -356,7 +356,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
ASSERT(0);
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
@ -490,7 +489,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
return 0;
}
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId) {
void* pIter = NULL;
while (1) {
@ -498,14 +497,14 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, 0) < 0) {
if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, workerId) < 0) {
// TODO
}
}
return 0;
}
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) {
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) {
SStreamTaskExecReq req;
tDecodeSStreamTaskExecReq(msg, &req);
@ -515,7 +514,7 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
ASSERT(pTask);
if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) {
if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, workerId) < 0) {
// TODO
}
return 0;

View File

@ -167,8 +167,10 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break;
}
// TODO handle null
colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL);
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL;
}
}
curRow++;
}

View File

@ -41,7 +41,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}
}
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -69,9 +69,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen);
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, pInfo->workerId);
case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen);
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, pInfo->workerId);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default:

View File

@ -184,17 +184,18 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
} break;
case TDMT_VND_TASK_WRITE_EXEC: {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
0) < 0) {
}
} break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
#if 1
SSmaCfg vCreateSmaReq = {0};
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
vWarn("vgId%d: TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId, terrstr(terrno));
vWarn("vgId%d: TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId,
terrstr(terrno));
return -1;
}
vWarn("vgId%d: TDMT_VND_CREATE_SMA received for %s:%" PRIi64, pVnode->config.vgId, vCreateSmaReq.tSma.indexName,

View File

@ -387,11 +387,12 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
printf("subscribe err\n");
return;
}
/*taosSsleep(3);*/
int32_t batchCnt = 0;
int32_t skipLogNum = 0;
int64_t startTime = taosGetTimestampUs();
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 3000);
if (tmqmessage) {
batchCnt++;
skipLogNum += tmqGetSkipLogNum(tmqmessage);