feat(stream): stream support multiple type input

This commit is contained in:
Liu Jicong 2022-06-22 21:21:39 +08:00
parent df1967de4b
commit 14a488c92c
8 changed files with 40 additions and 57 deletions

View File

@ -239,17 +239,14 @@ typedef struct {
struct SStreamTask {
int64_t streamId;
int32_t taskId;
int8_t inputType;
int8_t taskStatus;
int8_t execStatus;
int8_t isDataScan;
int8_t execType;
int8_t sinkType;
int8_t dispatchType;
int16_t dispatchMsgType;
int8_t isDataScan;
int8_t taskStatus;
int8_t execStatus;
// node info
int32_t selfChildId;

View File

@ -270,10 +270,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pTask->isDataScan = 0;
// source
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
pTask->isDataScan = 0;
// exec
pTask->execType = TASK_EXEC__NONE;
@ -320,10 +318,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
#endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
pTask->isDataScan = 0;
// source
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
pTask->isDataScan = 0;
// exec
pTask->execType = TASK_EXEC__NONE;
@ -392,12 +388,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pInnerTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
pInnerTask->isDataScan = 0;
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
// input
pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
// source
pInnerTask->isDataScan = 0;
// trigger
pInnerTask->triggerParam = pStream->triggerParam;
@ -458,11 +452,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskSourceLevel, pTask);
// source
pTask->isDataScan = 1;
// input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
// add fixed vg dispatch
pTask->sinkType = TASK_SINK__NONE;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
@ -517,10 +509,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskOneLevel, pTask);
pTask->isDataScan = 1;
// input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
pTask->isDataScan = 1;
// trigger
pTask->triggerParam = pStream->triggerParam;

View File

@ -426,6 +426,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT(0);
}
tDecoderClear(&decoder);
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
pTask->execStatus = TASK_EXEC_STATUS__IDLE;
@ -505,7 +506,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
continue;
}
if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue;
if (!pTask->isDataScan) continue;
if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {

View File

@ -345,8 +345,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->isDataScan) {
int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
ASSERT(code == 0);
}

View File

@ -40,7 +40,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SStreamBlockScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid;
// the block type can not be changed in the streamscan operators
// no need to check
#if 0
if (pInfo->blockType == 0) {
pInfo->blockType = type;
@ -49,10 +49,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return TSDB_CODE_QRY_APP_ERROR;
}
#endif
// rollup sma, the same qTaskInfo is used to insert data by SubmitReq and fetch result by SSDataBlock
if (pInfo->blockType != type) {
pInfo->blockType = type;
}
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {

View File

@ -916,6 +916,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
size_t total = taosArrayGetSize(pInfo->pBlockLists);
// TODO: refactor
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
if (pInfo->validBlockIndex >= total) {
/*doClearBufferedBlocks(pInfo);*/
@ -1969,8 +1970,7 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
}
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId,
SNode* pTagCond) {
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -1992,7 +1992,6 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -24,12 +24,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
SStreamTrigger* pTrigger = (SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
ASSERT(pTask->inputType == STREAM_INPUT__DATA_SUBMIT);
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
ASSERT(pTask->inputType == STREAM_INPUT__DATA_BLOCK);
SArray* blocks = pBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) {
@ -89,17 +88,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
return NULL;
}
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__TRIGGER) {
int8_t type = ((SStreamQueueItem*)data)->type;
if (type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else {
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
} else {
} else if (type == STREAM_INPUT__DATA_BLOCK) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
}
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
}
streamQueueProcessSuccess(pTask->inputQueue);
return taosArrayInit(0, sizeof(SSDataBlock));

View File

@ -50,14 +50,14 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
@ -106,14 +106,14 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;