merge from 3.0

This commit is contained in:
Liu Jicong 2022-03-29 18:58:15 +08:00
parent 2aa586ced4
commit a9ad7ebd10
7 changed files with 24 additions and 23 deletions

View File

@ -76,7 +76,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
if (code != 0) { if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code); vmSendRsp(pVnode->pWrapper, pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
@ -168,7 +168,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, will be processed in vnode-merge queue", pMsg); dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
if (code != 0) { if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code); vmSendRsp(pVnode->pWrapper, pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
@ -414,8 +414,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
pWPool->max = maxMergeThreads; pWPool->max = maxMergeThreads;
if (tWWorkerInit(pWPool) != 0) return -1; if (tWWorkerInit(pWPool) != 0) return -1;
SSingleWorkerCfg cfg = { SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
dError("failed to start vnode-mgmt worker since %s", terrstr()); dError("failed to start vnode-mgmt worker since %s", terrstr());
return -1; return -1;

View File

@ -17,8 +17,9 @@
#define _TD_VNODE_H_ #define _TD_VNODE_H_
#include "os.h" #include "os.h"
#include "trpc.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tqueue.h"
#include "trpc.h"
#include "meta.h" #include "meta.h"
#include "tarray.h" #include "tarray.h"
@ -166,7 +167,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
* @param pMsg The request message * @param pMsg The request message
* @return int 0 for success, -1 for failure * @return int 0 for success, -1 for failure
*/ */
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg); int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
/* ------------------------ SVnodeCfg ------------------------ */ /* ------------------------ SVnodeCfg ------------------------ */
/** /**
@ -185,7 +186,6 @@ void vnodeOptionsClear(SVnodeCfg *pOptions);
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
/* ------------------------ FOR COMPILE ------------------------ */ /* ------------------------ FOR COMPILE ------------------------ */
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);

View File

@ -197,9 +197,9 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
// sma // sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);

View File

@ -356,7 +356,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
pMsg->code = -1; pMsg->code = -1;
ASSERT(0);
return -1; return -1;
} }
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
@ -490,7 +489,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
return 0; return 0;
} }
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId) {
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
@ -498,14 +497,14 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter; SStreamTask* pTask = (SStreamTask*)pIter;
if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, 0) < 0) { if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, workerId) < 0) {
// TODO // TODO
} }
} }
return 0; return 0;
} }
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) {
SStreamTaskExecReq req; SStreamTaskExecReq req;
tDecodeSStreamTaskExecReq(msg, &req); tDecodeSStreamTaskExecReq(msg, &req);
@ -515,7 +514,7 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
ASSERT(pTask); ASSERT(pTask);
if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) { if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, workerId) < 0) {
// TODO // TODO
} }
return 0; return 0;

View File

@ -167,8 +167,10 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break; break;
} }
// TODO handle null if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL); taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL;
}
} }
curRow++; curRow++;
} }

View File

@ -41,7 +41,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
} }
} }
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -69,9 +69,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC: case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen); return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, pInfo->workerId);
case TDMT_VND_STREAM_TRIGGER: case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, pInfo->workerId);
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default: default:

View File

@ -184,17 +184,18 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
} break; } break;
case TDMT_VND_TASK_WRITE_EXEC: { case TDMT_VND_TASK_WRITE_EXEC: {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
pMsg->contLen - sizeof(SMsgHead)) < 0) { 0) < 0) {
} }
} break; } break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA case TDMT_VND_CREATE_SMA: { // timeRangeSMA
#if 1 #if 1
SSmaCfg vCreateSmaReq = {0}; SSmaCfg vCreateSmaReq = {0};
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
vWarn("vgId%d: TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId, terrstr(terrno)); vWarn("vgId%d: TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId,
terrstr(terrno));
return -1; return -1;
} }
vWarn("vgId%d: TDMT_VND_CREATE_SMA received for %s:%" PRIi64, pVnode->config.vgId, vCreateSmaReq.tSma.indexName, vWarn("vgId%d: TDMT_VND_CREATE_SMA received for %s:%" PRIi64, pVnode->config.vgId, vCreateSmaReq.tSma.indexName,