diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bdb6181884..6494e06c45 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -565,8 +565,10 @@ typedef struct { SArray* pVgroupInfos; // Array of SVgroupInfo } SUseDbRsp; -int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); +int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); +int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp); +int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp); void tFreeSUsedbRsp(SUseDbRsp* pRsp); typedef struct { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8be9bbbebd..02fe591a09 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -72,8 +72,9 @@ typedef struct { } STaskDispatcherFixedEp; typedef struct { - int8_t hashMethod; - SArray* info; + // int8_t hashMethod; + char stbFullName[TSDB_TABLE_FNAME_LEN]; + SUseDbRsp dbInfo; } STaskDispatcherShuffle; typedef struct { @@ -135,7 +136,6 @@ typedef struct { int8_t sinkType; int8_t dispatchType; int16_t dispatchMsgType; - int32_t downstreamTaskId; int32_t nodeId; SEpSet epSet; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f464ce6f50..c85184ffba 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1829,7 +1829,7 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) { return 0; } -static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { +int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, const SUseDbRsp *pRsp) { if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1; @@ -1848,7 +1848,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { return 0; } -int32_t tSerializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) { +int32_t tSerializeSUseDbRsp(void *buf, int32_t bufLen, const SUseDbRsp *pRsp) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index 125b0d3191..c0b25d74d1 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -28,6 +28,7 @@ SDbObj *mndAcquireDb(SMnode *pMnode, const char *db); void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen); char *mnGetDbStr(char *src); +int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 05c7d79a1a..1f629b8837 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -955,7 +955,6 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) { sdbCancelFetch(pSdb, pIter); } - static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { int32_t vindex = 0; SSdb *pSdb = pMnode->pSdb; @@ -991,7 +990,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { } sdbRelease(pSdb, pVgroup); - + if (pDb && (vindex >= pDb->cfg.numOfVgroups)) { break; } @@ -1000,6 +999,28 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { sdbCancelFetch(pSdb, pIter); } +int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq) { + pRsp->pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); + if (pRsp->pVgroupInfos == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int32_t numOfTable = 0; + mndGetDBTableNum(pDb, pMnode, &numOfTable); + + if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable) { + mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos); + } + + memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); + pRsp->uid = pDb->uid; + pRsp->vgVersion = pDb->vgVersion; + pRsp->vgNum = taosArrayGetSize(pRsp->pVgroupInfos); + pRsp->hashMethod = pDb->hashMethod; + return 0; +} + static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; int32_t code = -1; @@ -1023,10 +1044,10 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto USE_DB_OVER; } - + mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); usedbRsp.vgVersion = vgVersion++; - + if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; } @@ -1034,7 +1055,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { usedbRsp.vgVersion = usedbReq.vgVersion; code = 0; } - usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); + usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); // no jump, need to construct rsp } else { @@ -1057,24 +1078,10 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { goto USE_DB_OVER; } - usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); - if (usedbRsp.pVgroupInfos == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + if (mndExtractDbInfo(pMnode, pDb, &usedbRsp, &usedbReq) < 0) { goto USE_DB_OVER; } - int32_t numOfTable = 0; - mndGetDBTableNum(pDb, pMnode, &numOfTable); - - if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid || numOfTable != usedbReq.numOfTable) { - mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); - } - - memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); - usedbRsp.uid = pDb->uid; - usedbRsp.vgVersion = pDb->vgVersion; - usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); - usedbRsp.hashMethod = pDb->hashMethod; code = 0; } } @@ -1138,7 +1145,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, mndReleaseDb(pMnode, pDb); continue; } - + usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); if (usedbRsp.pVgroupInfos == NULL) { mndReleaseDb(pMnode, pDb); @@ -1364,11 +1371,11 @@ static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMet pSchema[cols].bytes = pShow->bytes[cols]; cols++; -// pShow->bytes[cols] = 1; -// pSchema[cols].type = TSDB_DATA_TYPE_TINYINT; -// strcpy(pSchema[cols].name, "update"); -// pSchema[cols].bytes = pShow->bytes[cols]; -// cols++; + // pShow->bytes[cols] = 1; + // pSchema[cols].type = TSDB_DATA_TYPE_TINYINT; + // strcpy(pSchema[cols].name, "update"); + // pSchema[cols].bytes = pShow->bytes[cols]; + // cols++; pMeta->numOfColumns = cols; pShow->numOfColumns = cols; @@ -1396,14 +1403,15 @@ char *mnGetDbStr(char *src) { return pos; } -static char* getDataPosition(char* pData, SShowObj* pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) { +static char *getDataPosition(char *pData, SShowObj *pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) { return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows; } -static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity, int64_t numOfTables) { +static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_t rows, int32_t rowCapacity, + int64_t numOfTables) { int32_t cols = 0; - char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + char *pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); char *name = mnGetDbStr(pDb->name); if (name != NULL) { STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]); @@ -1497,20 +1505,20 @@ static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_ STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); cols++; -// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); -// *(int8_t *)pWrite = pDb->cfg.update; + // pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + // *(int8_t *)pWrite = pDb->cfg.update; } -static void setInformationSchemaDbCfg(SDbObj* pDbObj) { +static void setInformationSchemaDbCfg(SDbObj *pDbObj) { ASSERT(pDbObj != NULL); strncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name)); - pDbObj->createdTime = 0; + pDbObj->createdTime = 0; pDbObj->cfg.numOfVgroups = 0; - pDbObj->cfg.quorum = 1; + pDbObj->cfg.quorum = 1; pDbObj->cfg.replications = 1; - pDbObj->cfg.update = 1; - pDbObj->cfg.precision = TSDB_TIME_PRECISION_MILLI; + pDbObj->cfg.update = 1; + pDbObj->cfg.precision = TSDB_TIME_PRECISION_MILLI; } static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 697811cd04..305912a72a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -222,8 +222,19 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i /*pTask->sinkType = TASK_SINK__NONE;*/ // dispatch part + pTask->dispatchType = TASK_DISPATCH__NONE; +#if 0 pTask->dispatchType = TASK_DISPATCH__SHUFFLE; pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + SDbObj* pDb = mndAcquireDb(pMnode, pStream->db); + ASSERT(pDb); + if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + sdbRelease(pSdb, pDb); + qDestroyQueryPlan(pPlan); + return -1; + } + sdbRelease(pSdb, pDb); +#endif // exec part pTask->execType = TASK_EXEC__MERGE; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 028e310a25..eb4cd2f97d 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -16,12 +16,88 @@ #include "tstream.h" #include "executor.h" +static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { + SStreamTaskExecReq req = { + .streamId = pTask->streamId, + .data = data, + }; + + int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req); + void* buf = rpcMallocCont(tlen); + + if (buf == NULL) { + return -1; + } + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + ((SMsgHead*)buf)->vgId = 0; + req.taskId = pTask->inplaceDispatcher.taskId; + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId); + *ppEpSet = &pTask->fixedEpDispatcher.epSet; + req.taskId = pTask->fixedEpDispatcher.taskId; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + int32_t nodeId = 0; + char ctbName[TSDB_TABLE_FNAME_LEN]; + // all groupId must be the same in an array + SSDataBlock* pBlock = taosArrayGet(data, 0); + sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId); + + // TODO: get hash function by hashMethod + + // get groupId, compute hash value + uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); + // + // get node + // TODO: optimize search process + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(vgInfo); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + nodeId = pVgInfo->vgId; + *ppEpSet = &pVgInfo->epSet; + break; + } + } + ASSERT(nodeId != 0); + ((SMsgHead*)buf)->vgId = htonl(nodeId); + } + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncodeSStreamTaskExecReq(&abuf, &req); + + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + pMsg->msgType = pTask->dispatchMsgType; + + return 0; +} + +static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashObj* data) { + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(data, pIter); + if (pIter == NULL) return 0; + SArray* pData = (SArray*)pIter; + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet; + if (streamBuildDispatchMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } + tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); + } + return 0; +} + int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) { SArray* pRes = NULL; // source if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0; // exec + // TODO: for shuffle dispatcher, merge data by groupId if (pTask->execType != TASK_EXEC__NONE) { ASSERT(workId < pTask->exec.numOfRunners); void* exec = pTask->exec.runners[workId].executor; @@ -83,28 +159,13 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in } // dispatch + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - SStreamTaskExecReq req = { - .streamId = pTask->streamId, - .taskId = pTask->taskId, - .data = pRes, - }; - - int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req); - void* buf = rpcMallocCont(tlen); - - if (buf == NULL) { + SRpcMsg dispatchMsg = {0}; + if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) { + ASSERT(0); return -1; } - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSStreamTaskExecReq(&abuf, &req); - - SRpcMsg dispatchMsg = { - .pCont = buf, - .contLen = tlen, - .code = 0, - .msgType = pTask->dispatchMsgType, - }; int32_t qType; if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { @@ -120,36 +181,38 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - SStreamTaskExecReq req = { - .streamId = pTask->streamId, - .taskId = pTask->fixedEpDispatcher.taskId, - .data = pRes, - }; - - int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req); - void* buf = rpcMallocCont(tlen); - - if (buf == NULL) { + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); return -1; } - ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSStreamTaskExecReq(&abuf, &req); - - SRpcMsg dispatchMsg = { - .pCont = buf, - .contLen = tlen, - .code = 0, - .msgType = pTask->dispatchMsgType, - }; - - SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; - tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - // TODO + SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (pShuffleRes == NULL) { + return -1; + } + + int32_t sz = taosArrayGetSize(pRes); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pRes, i); + SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t)); + if (pArray == NULL) { + pArray = taosArrayInit(0, sizeof(SSDataBlock)); + if (pArray == NULL) { + return -1; + } + taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*)); + } + taosArrayPush(pArray, pDataBlock); + } + + if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) { + return -1; + } } else { ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); @@ -196,7 +259,6 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; @@ -225,7 +287,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1; + if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; + /*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ } /*tEndEncode(pEncoder);*/ @@ -242,7 +305,6 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; @@ -271,7 +333,8 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1; + /*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ + if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; } /*tEndDecode(pDecoder);*/