stream add shuffle dispatcher
This commit is contained in:
parent
9a08ac3c5f
commit
b27fa6c8e3
|
@ -565,8 +565,10 @@ typedef struct {
|
||||||
SArray* pVgroupInfos; // Array of SVgroupInfo
|
SArray* pVgroupInfos; // Array of SVgroupInfo
|
||||||
} SUseDbRsp;
|
} SUseDbRsp;
|
||||||
|
|
||||||
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
|
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
|
||||||
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
|
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
|
||||||
|
int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp);
|
||||||
|
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp);
|
||||||
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
|
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -72,8 +72,9 @@ typedef struct {
|
||||||
} STaskDispatcherFixedEp;
|
} STaskDispatcherFixedEp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t hashMethod;
|
// int8_t hashMethod;
|
||||||
SArray* info;
|
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
SUseDbRsp dbInfo;
|
||||||
} STaskDispatcherShuffle;
|
} STaskDispatcherShuffle;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -135,7 +136,6 @@ typedef struct {
|
||||||
int8_t sinkType;
|
int8_t sinkType;
|
||||||
int8_t dispatchType;
|
int8_t dispatchType;
|
||||||
int16_t dispatchMsgType;
|
int16_t dispatchMsgType;
|
||||||
int32_t downstreamTaskId;
|
|
||||||
|
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
|
|
|
@ -1829,7 +1829,7 @@ int32_t tDeserializeSSyncDbReq(void *buf, int32_t bufLen, SSyncDbReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) {
|
int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, const SUseDbRsp *pRsp) {
|
||||||
if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1;
|
if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1;
|
||||||
|
@ -1848,7 +1848,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) {
|
int32_t tSerializeSUseDbRsp(void *buf, int32_t bufLen, const SUseDbRsp *pRsp) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
|
||||||
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
|
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
|
||||||
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
|
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
|
||||||
char *mnGetDbStr(char *src);
|
char *mnGetDbStr(char *src);
|
||||||
|
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -955,7 +955,6 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
|
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
|
||||||
int32_t vindex = 0;
|
int32_t vindex = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -991,7 +990,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
|
||||||
if (pDb && (vindex >= pDb->cfg.numOfVgroups)) {
|
if (pDb && (vindex >= pDb->cfg.numOfVgroups)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1000,6 +999,28 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq) {
|
||||||
|
pRsp->pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
||||||
|
if (pRsp->pVgroupInfos == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfTable = 0;
|
||||||
|
mndGetDBTableNum(pDb, pMnode, &numOfTable);
|
||||||
|
|
||||||
|
if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable) {
|
||||||
|
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
pRsp->uid = pDb->uid;
|
||||||
|
pRsp->vgVersion = pDb->vgVersion;
|
||||||
|
pRsp->vgNum = taosArrayGetSize(pRsp->pVgroupInfos);
|
||||||
|
pRsp->hashMethod = pDb->hashMethod;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
|
static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -1023,10 +1044,10 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto USE_DB_OVER;
|
goto USE_DB_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
|
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
|
||||||
usedbRsp.vgVersion = vgVersion++;
|
usedbRsp.vgVersion = vgVersion++;
|
||||||
|
|
||||||
if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) {
|
if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
@ -1034,7 +1055,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
|
||||||
usedbRsp.vgVersion = usedbReq.vgVersion;
|
usedbRsp.vgVersion = usedbReq.vgVersion;
|
||||||
code = 0;
|
code = 0;
|
||||||
}
|
}
|
||||||
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
|
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
|
||||||
|
|
||||||
// no jump, need to construct rsp
|
// no jump, need to construct rsp
|
||||||
} else {
|
} else {
|
||||||
|
@ -1057,24 +1078,10 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
|
||||||
goto USE_DB_OVER;
|
goto USE_DB_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
if (mndExtractDbInfo(pMnode, pDb, &usedbRsp, &usedbReq) < 0) {
|
||||||
if (usedbRsp.pVgroupInfos == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto USE_DB_OVER;
|
goto USE_DB_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTable = 0;
|
|
||||||
mndGetDBTableNum(pDb, pMnode, &numOfTable);
|
|
||||||
|
|
||||||
if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid || numOfTable != usedbReq.numOfTable) {
|
|
||||||
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
|
|
||||||
usedbRsp.uid = pDb->uid;
|
|
||||||
usedbRsp.vgVersion = pDb->vgVersion;
|
|
||||||
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
|
|
||||||
usedbRsp.hashMethod = pDb->hashMethod;
|
|
||||||
code = 0;
|
code = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1138,7 +1145,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
||||||
if (usedbRsp.pVgroupInfos == NULL) {
|
if (usedbRsp.pVgroupInfos == NULL) {
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
@ -1364,11 +1371,11 @@ static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMet
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
pSchema[cols].bytes = pShow->bytes[cols];
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
// pShow->bytes[cols] = 1;
|
// pShow->bytes[cols] = 1;
|
||||||
// pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
// pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
||||||
// strcpy(pSchema[cols].name, "update");
|
// strcpy(pSchema[cols].name, "update");
|
||||||
// pSchema[cols].bytes = pShow->bytes[cols];
|
// pSchema[cols].bytes = pShow->bytes[cols];
|
||||||
// cols++;
|
// cols++;
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
pMeta->numOfColumns = cols;
|
||||||
pShow->numOfColumns = cols;
|
pShow->numOfColumns = cols;
|
||||||
|
@ -1396,14 +1403,15 @@ char *mnGetDbStr(char *src) {
|
||||||
return pos;
|
return pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
static char* getDataPosition(char* pData, SShowObj* pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) {
|
static char *getDataPosition(char *pData, SShowObj *pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) {
|
||||||
return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows;
|
return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity, int64_t numOfTables) {
|
static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_t rows, int32_t rowCapacity,
|
||||||
|
int64_t numOfTables) {
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
char *pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
char *name = mnGetDbStr(pDb->name);
|
char *name = mnGetDbStr(pDb->name);
|
||||||
if (name != NULL) {
|
if (name != NULL) {
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
|
||||||
|
@ -1497,20 +1505,20 @@ static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_
|
||||||
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
// *(int8_t *)pWrite = pDb->cfg.update;
|
// *(int8_t *)pWrite = pDb->cfg.update;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setInformationSchemaDbCfg(SDbObj* pDbObj) {
|
static void setInformationSchemaDbCfg(SDbObj *pDbObj) {
|
||||||
ASSERT(pDbObj != NULL);
|
ASSERT(pDbObj != NULL);
|
||||||
strncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name));
|
strncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name));
|
||||||
|
|
||||||
pDbObj->createdTime = 0;
|
pDbObj->createdTime = 0;
|
||||||
pDbObj->cfg.numOfVgroups = 0;
|
pDbObj->cfg.numOfVgroups = 0;
|
||||||
pDbObj->cfg.quorum = 1;
|
pDbObj->cfg.quorum = 1;
|
||||||
pDbObj->cfg.replications = 1;
|
pDbObj->cfg.replications = 1;
|
||||||
pDbObj->cfg.update = 1;
|
pDbObj->cfg.update = 1;
|
||||||
pDbObj->cfg.precision = TSDB_TIME_PRECISION_MILLI;
|
pDbObj->cfg.precision = TSDB_TIME_PRECISION_MILLI;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) {
|
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) {
|
||||||
|
|
|
@ -222,8 +222,19 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
|
||||||
/*pTask->sinkType = TASK_SINK__NONE;*/
|
/*pTask->sinkType = TASK_SINK__NONE;*/
|
||||||
|
|
||||||
// dispatch part
|
// dispatch part
|
||||||
|
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||||
|
#if 0
|
||||||
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
||||||
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
||||||
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
|
||||||
|
ASSERT(pDb);
|
||||||
|
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||||
|
sdbRelease(pSdb, pDb);
|
||||||
|
qDestroyQueryPlan(pPlan);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbRelease(pSdb, pDb);
|
||||||
|
#endif
|
||||||
|
|
||||||
// exec part
|
// exec part
|
||||||
pTask->execType = TASK_EXEC__MERGE;
|
pTask->execType = TASK_EXEC__MERGE;
|
||||||
|
|
|
@ -16,12 +16,88 @@
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
|
|
||||||
|
static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||||
|
SStreamTaskExecReq req = {
|
||||||
|
.streamId = pTask->streamId,
|
||||||
|
.data = data,
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req);
|
||||||
|
void* buf = rpcMallocCont(tlen);
|
||||||
|
|
||||||
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||||
|
((SMsgHead*)buf)->vgId = 0;
|
||||||
|
req.taskId = pTask->inplaceDispatcher.taskId;
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
|
||||||
|
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||||
|
req.taskId = pTask->fixedEpDispatcher.taskId;
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
|
int32_t nodeId = 0;
|
||||||
|
char ctbName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
// all groupId must be the same in an array
|
||||||
|
SSDataBlock* pBlock = taosArrayGet(data, 0);
|
||||||
|
sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
|
||||||
|
|
||||||
|
// TODO: get hash function by hashMethod
|
||||||
|
|
||||||
|
// get groupId, compute hash value
|
||||||
|
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
|
||||||
|
//
|
||||||
|
// get node
|
||||||
|
// TODO: optimize search process
|
||||||
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
int32_t sz = taosArrayGetSize(vgInfo);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
||||||
|
nodeId = pVgInfo->vgId;
|
||||||
|
*ppEpSet = &pVgInfo->epSet;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(nodeId != 0);
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(nodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
tEncodeSStreamTaskExecReq(&abuf, &req);
|
||||||
|
|
||||||
|
pMsg->pCont = buf;
|
||||||
|
pMsg->contLen = tlen;
|
||||||
|
pMsg->code = 0;
|
||||||
|
pMsg->msgType = pTask->dispatchMsgType;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashObj* data) {
|
||||||
|
void* pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = taosHashIterate(data, pIter);
|
||||||
|
if (pIter == NULL) return 0;
|
||||||
|
SArray* pData = (SArray*)pIter;
|
||||||
|
SRpcMsg dispatchMsg = {0};
|
||||||
|
SEpSet* pEpSet;
|
||||||
|
if (streamBuildDispatchMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
|
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
|
||||||
SArray* pRes = NULL;
|
SArray* pRes = NULL;
|
||||||
// source
|
// source
|
||||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0;
|
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0;
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
|
// TODO: for shuffle dispatcher, merge data by groupId
|
||||||
if (pTask->execType != TASK_EXEC__NONE) {
|
if (pTask->execType != TASK_EXEC__NONE) {
|
||||||
ASSERT(workId < pTask->exec.numOfRunners);
|
ASSERT(workId < pTask->exec.numOfRunners);
|
||||||
void* exec = pTask->exec.runners[workId].executor;
|
void* exec = pTask->exec.runners[workId].executor;
|
||||||
|
@ -83,28 +159,13 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatch
|
// dispatch
|
||||||
|
|
||||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||||
SStreamTaskExecReq req = {
|
SRpcMsg dispatchMsg = {0};
|
||||||
.streamId = pTask->streamId,
|
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
|
||||||
.taskId = pTask->taskId,
|
ASSERT(0);
|
||||||
.data = pRes,
|
|
||||||
};
|
|
||||||
|
|
||||||
int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req);
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
|
||||||
|
|
||||||
if (buf == NULL) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
|
||||||
tEncodeSStreamTaskExecReq(&abuf, &req);
|
|
||||||
|
|
||||||
SRpcMsg dispatchMsg = {
|
|
||||||
.pCont = buf,
|
|
||||||
.contLen = tlen,
|
|
||||||
.code = 0,
|
|
||||||
.msgType = pTask->dispatchMsgType,
|
|
||||||
};
|
|
||||||
|
|
||||||
int32_t qType;
|
int32_t qType;
|
||||||
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
|
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
|
||||||
|
@ -120,36 +181,38 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
||||||
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
|
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
|
||||||
|
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||||
SStreamTaskExecReq req = {
|
SRpcMsg dispatchMsg = {0};
|
||||||
.streamId = pTask->streamId,
|
SEpSet* pEpSet = NULL;
|
||||||
.taskId = pTask->fixedEpDispatcher.taskId,
|
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
|
||||||
.data = pRes,
|
ASSERT(0);
|
||||||
};
|
|
||||||
|
|
||||||
int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req);
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
|
||||||
|
|
||||||
if (buf == NULL) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
|
||||||
tEncodeSStreamTaskExecReq(&abuf, &req);
|
|
||||||
|
|
||||||
SRpcMsg dispatchMsg = {
|
|
||||||
.pCont = buf,
|
|
||||||
.contLen = tlen,
|
|
||||||
.code = 0,
|
|
||||||
.msgType = pTask->dispatchMsgType,
|
|
||||||
};
|
|
||||||
|
|
||||||
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
|
||||||
|
|
||||||
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
|
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
|
||||||
|
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
// TODO
|
SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||||
|
if (pShuffleRes == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sz = taosArrayGetSize(pRes);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pRes, i);
|
||||||
|
SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t));
|
||||||
|
if (pArray == NULL) {
|
||||||
|
pArray = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
|
if (pArray == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*));
|
||||||
|
}
|
||||||
|
taosArrayPush(pArray, pDataBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
|
@ -196,7 +259,6 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
|
||||||
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
|
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
||||||
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
|
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
|
||||||
|
@ -225,7 +287,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
||||||
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;
|
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
||||||
|
/*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
|
||||||
}
|
}
|
||||||
|
|
||||||
/*tEndEncode(pEncoder);*/
|
/*tEndEncode(pEncoder);*/
|
||||||
|
@ -242,7 +305,6 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
|
||||||
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
|
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
||||||
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
|
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
|
||||||
|
@ -271,7 +333,8 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
||||||
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;
|
/*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
|
||||||
|
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*tEndDecode(pDecoder);*/
|
/*tEndDecode(pDecoder);*/
|
||||||
|
|
Loading…
Reference in New Issue