add parallel

This commit is contained in:
Liu Jicong 2022-03-21 16:47:59 +08:00
parent 70cddceb60
commit 1751c0285a
3 changed files with 10 additions and 3 deletions

View File

@ -2275,7 +2275,7 @@ enum {
typedef struct { typedef struct {
void* inputHandle; void* inputHandle;
void** executor; void* executor[4];
} SStreamTaskParRunner; } SStreamTaskParRunner;
typedef struct { typedef struct {

View File

@ -2718,6 +2718,8 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->pipeEnd) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->parallel) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
@ -2730,6 +2732,8 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->pipeEnd) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->parallel) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);

View File

@ -57,7 +57,9 @@ void sndMetaDelete(SStreamMeta *pMeta) {
} }
int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
pTask->runner.executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); for (int i = 0; i < pTask->parallel; i++) {
pTask->runner.executor[i] = qCreateStreamExecTaskInfo(pTask->qmsg, NULL);
}
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
} }
@ -95,6 +97,7 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTask *pTask = malloc(sizeof(SStreamTask)); SStreamTask *pTask = malloc(sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
ASSERT(0); ASSERT(0);
return;
} }
SCoder decoder; SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER);