refactor stream
This commit is contained in:
parent
03b2162309
commit
564236f230
|
@ -138,7 +138,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
||||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
||||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
|
|
||||||
void blockDebugShowData(SArray* dataBlocks);
|
void blockDebugShowData(const SArray* dataBlocks);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,16 +30,18 @@ enum {
|
||||||
STREAM_TASK_STATUS__STOP,
|
STREAM_TASK_STATUS__STOP,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#if 0
|
||||||
// pipe -> fetch/pipe queue
|
// pipe -> fetch/pipe queue
|
||||||
// merge -> merge queue
|
// merge -> merge queue
|
||||||
// write -> write queue
|
// write -> write queue
|
||||||
enum {
|
enum {
|
||||||
TASK_SINK_MSG__SND_PIPE = 1,
|
TASK_DISPATCH_MSG__SND_PIPE = 1,
|
||||||
TASK_SINK_MSG__SND_MERGE,
|
TASK_DISPATCH_MSG__SND_MERGE,
|
||||||
TASK_SINK_MSG__VND_PIPE,
|
TASK_DISPATCH_MSG__VND_PIPE,
|
||||||
TASK_SINK_MSG__VND_MERGE,
|
TASK_DISPATCH_MSG__VND_MERGE,
|
||||||
TASK_SINK_MSG__VND_WRITE,
|
TASK_DISPATCH_MSG__VND_WRITE,
|
||||||
};
|
};
|
||||||
|
#endif
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t nodeId; // 0 for snode
|
int32_t nodeId; // 0 for snode
|
||||||
|
@ -93,13 +95,14 @@ typedef struct {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_SOURCE__SCAN = 1,
|
TASK_SOURCE__SCAN = 1,
|
||||||
TASK_SOURCE__SINGLE,
|
TASK_SOURCE__PIPE,
|
||||||
TASK_SOURCE__MULTI,
|
TASK_SOURCE__MERGE,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_EXEC__NONE = 1,
|
TASK_EXEC__NONE = 1,
|
||||||
TASK_EXEC__EXEC,
|
TASK_EXEC__PIPE,
|
||||||
|
TASK_EXEC__MERGE,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -129,6 +132,9 @@ typedef struct {
|
||||||
int16_t dispatchMsgType;
|
int16_t dispatchMsgType;
|
||||||
int32_t downstreamTaskId;
|
int32_t downstreamTaskId;
|
||||||
|
|
||||||
|
int32_t nodeId;
|
||||||
|
SEpSet epSet;
|
||||||
|
|
||||||
// source preprocess
|
// source preprocess
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
|
|
|
@ -1380,7 +1380,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
void blockDebugShowData(SArray* dataBlocks) {
|
void blockDebugShowData(const SArray* dataBlocks) {
|
||||||
char pBuf[128];
|
char pBuf[128];
|
||||||
int32_t sz = taosArrayGetSize(dataBlocks);
|
int32_t sz = taosArrayGetSize(dataBlocks);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
@ -1398,13 +1398,17 @@ void blockDebugShowData(SArray* dataBlocks) {
|
||||||
printf(" %25s |", pBuf);
|
printf(" %25s |", pBuf);
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
case TSDB_DATA_TYPE_UINT:
|
|
||||||
printf(" %15d |", *(int32_t*)var);
|
printf(" %15d |", *(int32_t*)var);
|
||||||
break;
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
printf(" %15u |", *(uint32_t*)var);
|
||||||
|
break;
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
case TSDB_DATA_TYPE_UBIGINT:
|
|
||||||
printf(" %15ld |", *(int64_t*)var);
|
printf(" %15ld |", *(int64_t*)var);
|
||||||
break;
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
printf(" %15lu |", *(uint64_t*)var);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
printf("\n");
|
printf("\n");
|
||||||
|
|
|
@ -122,7 +122,7 @@ bool tsRetrieveBlockingModel = 0;
|
||||||
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
|
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
|
||||||
bool tsKeepOriginalColumnName = 0;
|
bool tsKeepOriginalColumnName = 0;
|
||||||
|
|
||||||
// long query death-lock
|
// kill long query
|
||||||
bool tsDeadLockKillQuery = 0;
|
bool tsDeadLockKillQuery = 0;
|
||||||
|
|
||||||
// tsdb config
|
// tsdb config
|
||||||
|
|
|
@ -66,8 +66,11 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
|
||||||
|
|
||||||
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
|
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
|
pTask->nodeId = pVgroup->vgId;
|
||||||
|
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
|
||||||
plan->execNode.nodeId = pVgroup->vgId;
|
plan->execNode.nodeId = pVgroup->vgId;
|
||||||
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
plan->execNode.epSet = pTask->epSet;
|
||||||
|
|
||||||
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -86,8 +89,12 @@ SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
|
||||||
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
|
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
|
||||||
const SSnodeObj* pSnode) {
|
const SSnodeObj* pSnode) {
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
plan->execNode.nodeId = pSnode->id;
|
|
||||||
plan->execNode.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
|
pTask->nodeId = 0;
|
||||||
|
pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);
|
||||||
|
|
||||||
|
plan->execNode.nodeId = 0;
|
||||||
|
plan->execNode.epSet = pTask->epSet;
|
||||||
|
|
||||||
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -97,9 +104,23 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
|
||||||
|
void* pIter = NULL;
|
||||||
|
SVgObj* pVgroup = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (pVgroup->dbUid != dbUid) {
|
||||||
|
sdbRelease(pMnode->pSdb, pVgroup);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return pVgroup;
|
||||||
|
}
|
||||||
|
return pVgroup;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
SSdb* pSdb = pMnode->pSdb;
|
SSdb* pSdb = pMnode->pSdb;
|
||||||
SVgObj* pVgroup = NULL;
|
|
||||||
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
|
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
|
||||||
if (pPlan == NULL) {
|
if (pPlan == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -108,80 +129,144 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
ASSERT(pStream->vgNum == 0);
|
ASSERT(pStream->vgNum == 0);
|
||||||
|
|
||||||
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
|
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||||
pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
|
ASSERT(totLevel <= 2);
|
||||||
int32_t lastUsedVgId = 0;
|
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
|
||||||
|
|
||||||
// gather vnodes
|
for (int32_t level = 0; level < totLevel; level++) {
|
||||||
// gather snodes
|
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
|
||||||
// iterate plan, expand source to vnodes and assign ep to each task
|
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
|
||||||
// iterate tasks, assign sink type and sink ep to each task
|
ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
|
||||||
|
|
||||||
for (int32_t revLevel = totLevel - 1; revLevel >= 0; revLevel--) {
|
|
||||||
int32_t level = totLevel - 1 - revLevel;
|
|
||||||
SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
|
|
||||||
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, revLevel);
|
|
||||||
int32_t opNum = LIST_LENGTH(inner->pNodeList);
|
|
||||||
ASSERT(opNum == 1);
|
|
||||||
|
|
||||||
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
|
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
|
||||||
if (level == 0) {
|
|
||||||
|
// if (level == totLevel - 1 /* or no snode */) {
|
||||||
|
if (level == totLevel - 1) {
|
||||||
|
// last level, source, must assign to vnode
|
||||||
|
// must be scan type
|
||||||
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
|
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
|
||||||
|
|
||||||
|
// replicate task to each vnode
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
SVgObj* pVgroup;
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
if (pVgroup->dbUid != pStream->dbUid) {
|
if (pVgroup->dbUid != pStream->dbUid) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
lastUsedVgId = pVgroup->vgId;
|
|
||||||
pStream->vgNum++;
|
|
||||||
|
|
||||||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||||
/*pTask->level = level;*/
|
// source part
|
||||||
// TODO
|
pTask->sourceType = TASK_SOURCE__SCAN;
|
||||||
/*pTask->sourceType = STREAM_SOURCE__SUPER;*/
|
|
||||||
/*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/
|
// sink part
|
||||||
|
if (level == 0) {
|
||||||
|
// only for inplace
|
||||||
|
pTask->sinkType = TASK_SINK__SHOW;
|
||||||
|
pTask->showSink.reserved = 0;
|
||||||
|
} else {
|
||||||
|
pTask->sinkType = TASK_SINK__NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
// dispatch part
|
||||||
|
if (level == 0) {
|
||||||
|
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||||
|
// if inplace sink, no dispatcher
|
||||||
|
// if fixed ep, add fixed ep dispatcher
|
||||||
|
// if shuffle, add shuffle dispatcher
|
||||||
|
} else {
|
||||||
|
// add fixed ep dispatcher
|
||||||
|
int32_t lastLevel = level - 1;
|
||||||
|
ASSERT(lastLevel == 0);
|
||||||
|
SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
|
||||||
|
// one merge only
|
||||||
|
ASSERT(taosArrayGetSize(pArray) == 1);
|
||||||
|
SStreamTask* lastLevelTask = taosArrayGetP(pArray, lastLevel);
|
||||||
|
pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
|
||||||
|
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
||||||
|
|
||||||
|
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
|
||||||
|
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
// exec part
|
||||||
|
pTask->execType = TASK_EXEC__PIPE;
|
||||||
pTask->exec.parallelizable = 1;
|
pTask->exec.parallelizable = 1;
|
||||||
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
|
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosArrayPush(taskOneLevel, pTask);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
taosArrayPush(taskOneLevel, &pTask);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// merge plan
|
||||||
|
|
||||||
|
// TODO if has snode, assign to snode
|
||||||
|
|
||||||
|
// else, assign to vnode
|
||||||
|
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
|
||||||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||||
/*pTask->level = level;*/
|
|
||||||
/*pTask->sourceType = STREAM_SOURCE__NONE;*/
|
|
||||||
/*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/
|
|
||||||
pTask->exec.parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
|
|
||||||
|
|
||||||
SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
|
// source part, currently only support multi source
|
||||||
if (pSnode == NULL || tsStreamSchedV) {
|
pTask->sourceType = TASK_SOURCE__PIPE;
|
||||||
ASSERT(lastUsedVgId != 0);
|
|
||||||
SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId);
|
// sink part
|
||||||
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) {
|
pTask->sinkType = TASK_SINK__SHOW;
|
||||||
sdbRelease(pSdb, pVg);
|
/*pTask->sinkType = TASK_SINK__NONE;*/
|
||||||
qDestroyQueryPlan(pPlan);
|
|
||||||
return -1;
|
// dispatch part
|
||||||
}
|
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
||||||
sdbRelease(pSdb, pVg);
|
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
|
||||||
} else {
|
|
||||||
if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) {
|
// exec part
|
||||||
sdbRelease(pSdb, pSnode);
|
pTask->execType = TASK_EXEC__MERGE;
|
||||||
qDestroyQueryPlan(pPlan);
|
pTask->exec.parallelizable = 0;
|
||||||
return -1;
|
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
|
||||||
}
|
ASSERT(pVgroup);
|
||||||
|
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
qDestroyQueryPlan(pPlan);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
sdbRelease(pMnode->pSdb, pSnode);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
taosArrayPush(taskOneLevel, &pTask);
|
||||||
taosArrayPush(taskOneLevel, pTask);
|
|
||||||
}
|
}
|
||||||
taosArrayPush(pStream->tasks, taskOneLevel);
|
|
||||||
|
taosArrayPush(pStream->tasks, &taskOneLevel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (totLevel == 2) {
|
||||||
|
void* pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
SVgObj* pVgroup;
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (pVgroup->dbUid != pStream->dbUid) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||||
|
|
||||||
|
// source part
|
||||||
|
pTask->sourceType = TASK_SOURCE__MERGE;
|
||||||
|
|
||||||
|
// sink part
|
||||||
|
pTask->sinkType = TASK_SINK__SHOW;
|
||||||
|
|
||||||
|
// dispatch part
|
||||||
|
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||||
|
|
||||||
|
// exec part
|
||||||
|
pTask->execType = TASK_EXEC__NONE;
|
||||||
|
pTask->exec.parallelizable = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// free memory
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,12 +14,11 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
#include "tqInt.h"
|
#include "tqInt.h"
|
||||||
#include "tqMetaStore.h"
|
#include "tqMetaStore.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
|
|
||||||
void blockDebugShowData(SArray* dataBlocks);
|
|
||||||
|
|
||||||
int32_t tqInit() { return tqPushMgrInit(); }
|
int32_t tqInit() { return tqPushMgrInit(); }
|
||||||
|
|
||||||
void tqCleanUp() { tqPushMgrCleanUp(); }
|
void tqCleanUp() { tqPushMgrCleanUp(); }
|
||||||
|
@ -445,18 +444,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
||||||
if (pTask->execType == TASK_EXEC__NONE) return 0;
|
if (pTask->execType == TASK_EXEC__NONE) return 0;
|
||||||
|
|
||||||
pTask->exec.numOfRunners = parallel;
|
pTask->exec.numOfRunners = parallel;
|
||||||
|
pTask->exec.runners = calloc(parallel, sizeof(SStreamRunner));
|
||||||
|
if (pTask->exec.runners == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
for (int32_t i = 0; i < parallel; i++) {
|
for (int32_t i = 0; i < parallel; i++) {
|
||||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
|
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.reader = pReadHandle,
|
.reader = pReadHandle,
|
||||||
.meta = pTq->pVnodeMeta,
|
.meta = pTq->pVnodeMeta,
|
||||||
};
|
};
|
||||||
pTask->exec.runners = calloc(parallel, sizeof(SStreamRunner));
|
|
||||||
if (pTask->exec.runners == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
pTask->exec.runners[i].inputHandle = pReadHandle;
|
pTask->exec.runners[i].inputHandle = pReadHandle;
|
||||||
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||||
|
ASSERT(pTask->exec.runners[i].executor);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -473,7 +473,10 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
}
|
}
|
||||||
tCoderClear(&decoder);
|
tCoderClear(&decoder);
|
||||||
|
|
||||||
tqExpandTask(pTq, pTask, 8);
|
if (tqExpandTask(pTq, pTask, 4) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -22,10 +22,13 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
||||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0;
|
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0;
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
if (pTask->execType == TASK_EXEC__EXEC) {
|
if (pTask->execType != TASK_EXEC__NONE) {
|
||||||
ASSERT(workId < pTask->exec.numOfRunners);
|
ASSERT(workId < pTask->exec.numOfRunners);
|
||||||
void* exec = pTask->exec.runners[workId].executor;
|
void* exec = pTask->exec.runners[workId].executor;
|
||||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
|
if (pRes == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
||||||
qSetStreamInput(exec, input, inputType);
|
qSetStreamInput(exec, input, inputType);
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -79,7 +82,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatch
|
// dispatch
|
||||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||||
SStreamTaskExecReq req = {
|
SStreamTaskExecReq req = {
|
||||||
.streamId = pTask->streamId,
|
.streamId = pTask->streamId,
|
||||||
.taskId = pTask->taskId,
|
.taskId = pTask->taskId,
|
||||||
|
@ -101,28 +104,54 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
||||||
.code = 0,
|
.code = 0,
|
||||||
.msgType = pTask->dispatchMsgType,
|
.msgType = pTask->dispatchMsgType,
|
||||||
};
|
};
|
||||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
|
||||||
int32_t qType;
|
int32_t qType;
|
||||||
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
|
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
|
||||||
qType = FETCH_QUEUE;
|
qType = FETCH_QUEUE;
|
||||||
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||
|
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||
|
||||||
pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {
|
pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {
|
||||||
qType = MERGE_QUEUE;
|
qType = MERGE_QUEUE;
|
||||||
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {
|
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {
|
||||||
qType = WRITE_QUEUE;
|
qType = WRITE_QUEUE;
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
|
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
|
||||||
((SMsgHead*)buf)->vgId = pTask->fixedEpDispatcher.nodeId;
|
|
||||||
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
|
||||||
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
|
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
|
||||||
// TODO
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
|
||||||
|
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||||
|
SStreamTaskExecReq req = {
|
||||||
|
.streamId = pTask->streamId,
|
||||||
|
.taskId = pTask->taskId,
|
||||||
|
.data = pRes,
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req);
|
||||||
|
void* buf = rpcMallocCont(tlen);
|
||||||
|
|
||||||
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
tEncodeSStreamTaskExecReq(&abuf, &req);
|
||||||
|
|
||||||
|
SRpcMsg dispatchMsg = {
|
||||||
|
.pCont = buf,
|
||||||
|
.contLen = tlen,
|
||||||
|
.code = 0,
|
||||||
|
.msgType = pTask->dispatchMsgType,
|
||||||
|
};
|
||||||
|
|
||||||
|
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||||
|
|
||||||
|
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
|
||||||
|
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
|
// TODO
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -168,7 +197,10 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
|
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1;
|
||||||
|
|
||||||
if (pTask->execType == TASK_EXEC__EXEC) {
|
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
||||||
|
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
|
||||||
|
|
||||||
|
if (pTask->execType != TASK_EXEC__NONE) {
|
||||||
if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
@ -203,7 +235,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
|
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1;
|
||||||
|
|
||||||
if (pTask->execType == TASK_EXEC__EXEC) {
|
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
||||||
|
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
|
||||||
|
|
||||||
|
if (pTask->execType != TASK_EXEC__NONE) {
|
||||||
if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1;
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue