modify stream task

This commit is contained in:
Liu Jicong 2022-03-24 16:18:08 +08:00
parent 3e455e374e
commit 4cf88c35da
9 changed files with 104 additions and 77 deletions

View File

@ -1167,7 +1167,7 @@ typedef struct {
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char outputTbName[TSDB_TABLE_NAME_LEN]; char outputSTbName[TSDB_TABLE_FNAME_LEN];
int8_t igExists; int8_t igExists;
char* sql; char* sql;
char* ast; char* ast;
@ -2371,6 +2371,26 @@ enum {
STREAM_NEXT_OP_DST__SND, STREAM_NEXT_OP_DST__SND,
}; };
enum {
STREAM_SOURCE_TYPE__NONE = 1,
STREAM_SOURCE_TYPE__SUPER,
STREAM_SOURCE_TYPE__CHILD,
STREAM_SOURCE_TYPE__NORMAL,
};
enum {
STREAM_SINK_TYPE__NONE = 1,
STREAM_SINK_TYPE__INPLACE,
STREAM_SINK_TYPE__ASSIGNED,
STREAM_SINK_TYPE__MULTIPLE,
STREAM_SINK_TYPE__TEMPORARY,
};
enum {
STREAM_TYPE__NORMAL = 1,
STREAM_TYPE__SMA,
};
typedef struct { typedef struct {
void* inputHandle; void* inputHandle;
void* executor; void* executor;
@ -2381,28 +2401,33 @@ typedef struct {
int32_t taskId; int32_t taskId;
int32_t level; int32_t level;
int8_t status; int8_t status;
int8_t pipeSource;
int8_t pipeSink;
int8_t numOfRunners;
int8_t parallelizable; int8_t parallelizable;
int8_t nextOpDst; // vnode or snode
// vnode or snode
int8_t nextOpDst;
int8_t sourceType;
int8_t sinkType;
// for sink type assigned
int32_t sinkVgId;
SEpSet NextOpEp; SEpSet NextOpEp;
// executor meta info
char* qmsg; char* qmsg;
// not applied to encoder and decoder
// followings are not applied to encoder and decoder
int8_t numOfRunners;
SStreamRunner runner[8]; SStreamRunner runner[8];
// void* executor;
// void* stateStore;
// storage handle
} SStreamTask; } SStreamTask;
static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level) { static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId) {
SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask)); SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
return NULL; return NULL;
} }
pTask->taskId = tGenIdPI32(); pTask->taskId = tGenIdPI32();
pTask->streamId = streamId; pTask->streamId = streamId;
pTask->level = level;
pTask->status = STREAM_TASK_STATUS__RUNNING; pTask->status = STREAM_TASK_STATUS__RUNNING;
pTask->qmsg = NULL; pTask->qmsg = NULL;
return pTask; return pTask;

View File

@ -505,7 +505,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
.sql = (char*)sql, .sql = (char*)sql,
}; };
tNameExtractFullName(&name, req.name); tNameExtractFullName(&name, req.name);
strcpy(req.outputTbName, tbName); strcpy(req.outputSTbName, tbName);
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req); int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
void* buf = malloc(tlen); void* buf = malloc(tlen);

View File

@ -3045,7 +3045,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1; if (tEncodeCStr(&encoder, pReq->outputSTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1;
@ -3068,7 +3068,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->outputTbName) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->outputSTbName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeI32(&decoder, &astLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1;
@ -3101,12 +3101,14 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->pipeSource) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1; if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1; if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1;
// if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) {
if (tEncodeI32(pEncoder, pTask->sinkVgId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
}
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
/*tEndEncode(pEncoder);*/ /*tEndEncode(pEncoder);*/
return pEncoder->pos; return pEncoder->pos;
@ -3118,12 +3120,14 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->pipeSource) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1;
// if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) {
if (tDecodeI32(pDecoder, &pTask->sinkVgId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
}
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
/*tEndDecode(pDecoder);*/ /*tEndDecode(pDecoder);*/
return 0; return 0;

View File

@ -720,6 +720,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
char outputSTbName[TSDB_TABLE_FNAME_LEN];
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
int64_t uid; int64_t uid;
@ -735,7 +736,6 @@ typedef struct {
char* physicalPlan; char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>> SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* ColAlias; SArray* ColAlias;
char* outputSTbName;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);

View File

@ -16,6 +16,7 @@
#include "mndDef.h" #include "mndDef.h"
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
int32_t outputNameSz = 0; int32_t outputNameSz = 0;
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
@ -30,20 +31,19 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
// TODO encode tasks // TODO encode tasks
if (pObj->tasks) { if (pObj->tasks) {
int32_t sz = taosArrayGetSize(pObj->tasks); sz = taosArrayGetSize(pObj->tasks);
tEncodeI32(pEncoder, sz); }
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGet(pObj->tasks, i); SArray *pArray = taosArrayGet(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray); int32_t innerSz = taosArrayGetSize(pArray);
tEncodeI32(pEncoder, innerSz); if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGet(pArray, j); SStreamTask *pTask = taosArrayGet(pArray, j);
tEncodeSStreamTask(pEncoder, pTask); if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
} }
} }
} else {
tEncodeI32(pEncoder, 0);
}
if (pObj->ColAlias != NULL) { if (pObj->ColAlias != NULL) {
outputNameSz = taosArrayGetSize(pObj->ColAlias); outputNameSz = taosArrayGetSize(pObj->ColAlias);
@ -68,6 +68,7 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
pObj->tasks = NULL;
int32_t sz; int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (tDecodeI32(pDecoder, &sz) < 0) return -1;
if (sz != 0) { if (sz != 0) {
@ -83,15 +84,15 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
} }
taosArrayPush(pObj->tasks, pArray); taosArrayPush(pObj->tasks, pArray);
} }
} else {
pObj->tasks = NULL;
} }
int32_t outputNameSz; int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1; if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
if (outputNameSz != 0) {
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *)); pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
if (pObj->ColAlias == NULL) { if (pObj->ColAlias == NULL) {
return -1; return -1;
} }
}
for (int32_t i = 0; i < outputNameSz; i++) { for (int32_t i = 0; i < outputNameSz; i++) {
char *name; char *name;
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;

View File

@ -58,7 +58,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
action.contLen = tlen; action.contLen = tlen;
action.msgType = type; action.msgType = type;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf); free(buf);
return -1; return -1;
} }
return 0; return 0;
@ -131,13 +131,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
lastUsedVgId = pVgroup->vgId; lastUsedVgId = pVgroup->vgId;
pStream->vgNum++; pStream->vgNum++;
// send to vnode
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid);
pTask->pipeSource = 1; pTask->level = level;
pTask->pipeSink = level == totLevel - 1 ? 1 : 0; pTask->sourceType = 1;
pTask->sinkType = level == totLevel - 1 ? 1 : 0;
pTask->parallelizable = 1; pTask->parallelizable = 1;
// TODO: set to
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
@ -146,13 +145,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
} }
} else { } else {
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid);
pTask->pipeSource = 0; pTask->level = level;
pTask->pipeSink = level == totLevel - 1 ? 1 : 0; pTask->sourceType = 0;
pTask->sinkType = level == totLevel - 1 ? 1 : 0;
pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN; pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
pTask->nextOpDst = STREAM_NEXT_OP_DST__VND; pTask->nextOpDst = STREAM_NEXT_OP_DST__VND;
if (tsStreamSchedV) { SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
if (pSnode == NULL || tsStreamSchedV) {
ASSERT(lastUsedVgId != 0); ASSERT(lastUsedVgId != 0);
SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId); SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId);
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) { if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) {
@ -162,24 +163,19 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} }
sdbRelease(pSdb, pVg); sdbRelease(pSdb, pVg);
} else { } else {
SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
if (pSnode != NULL) {
if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) {
sdbRelease(pSdb, pSnode); sdbRelease(pSdb, pSnode);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
}
sdbRelease(pMnode->pSdb, pSnode); sdbRelease(pMnode->pSdb, pSnode);
} else {
// TODO: assign to one vg
ASSERT(0);
}
}
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
} }
taosArrayPush(pStream->tasks, taskOneLevel); taosArrayPush(pStream->tasks, taskOneLevel);
} }
qDestroyQueryPlan(pPlan);
return 0; return 0;
} }

View File

@ -272,6 +272,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
if (nodesStringToNode(ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {
return -1; return -1;
} }
#if 1
SArray *names = mndExtractNamesFromAst(pAst); SArray *names = mndExtractNamesFromAst(pAst);
printf("|"); printf("|");
for (int i = 0; i < taosArrayGetSize(names); i++) { for (int i = 0; i < taosArrayGetSize(names); i++) {
@ -280,6 +281,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
printf("\n=======================================================\n"); printf("\n=======================================================\n");
pStream->ColAlias = names; pStream->ColAlias = names;
#endif
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); mError("topic:%s, failed to get plan since %s", pStream->name, terrstr());
@ -308,6 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(streamObj.outputSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN);
streamObj.createTime = taosGetTimestampMs(); streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime; streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));

View File

@ -358,9 +358,7 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
return 0; return 0;
} }
int32_t mndStart(SMnode *pMnode) { int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); }
return mndInitTimer(pMnode);
}
int32_t mndProcessMsg(SNodeMsg *pMsg) { int32_t mndProcessMsg(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode; SMnode *pMnode = pMsg->pNode;
@ -419,7 +417,7 @@ int64_t mndGenerateUid(char *name, int32_t len) {
int64_t x = (us & 0x000000FFFFFFFFFF) << 24; int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
if (uuid) { if (uuid) {
return abs(uuid); return llabs(uuid);
} }
} while (true); } while (true);
} }

View File

@ -561,7 +561,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter); pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter; SStreamTask* pTask = (SStreamTask*)pIter;
if (!pTask->pipeSource) continue; if (!pTask->sourceType) continue;
int32_t workerId = 0; int32_t workerId = 0;
void* exec = pTask->runner[workerId].executor; void* exec = pTask->runner[workerId].executor;
@ -578,7 +578,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
} }
taosArrayPush(pRes, output); taosArrayPush(pRes, output);
} }
if (pTask->pipeSink) { if (pTask->sinkType) {
// write back // write back
/*printf("reach end\n");*/ /*printf("reach end\n");*/
tqDebugShowSSData(pRes); tqDebugShowSSData(pRes);