refactor(stream)
This commit is contained in:
parent
a3d1b0a50d
commit
f77f910264
|
@ -40,6 +40,7 @@ enum {
|
|||
TASK_INPUT_STATUS__BLOCKED,
|
||||
TASK_INPUT_STATUS__RECOVER,
|
||||
TASK_INPUT_STATUS__STOP,
|
||||
TASK_INPUT_STATUS__FAILED,
|
||||
};
|
||||
|
||||
enum {
|
||||
|
@ -234,7 +235,9 @@ struct SStreamTask {
|
|||
int8_t outputStatus;
|
||||
|
||||
STaosQueue* inputQ;
|
||||
STaosQall* inputQAll;
|
||||
STaosQueue* outputQ;
|
||||
STaosQall* outputQAll;
|
||||
|
||||
// application storage
|
||||
void* ahandle;
|
||||
|
@ -282,7 +285,7 @@ int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
|
|||
|
||||
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId);
|
||||
|
||||
int32_t streamTaskExecNew(SStreamTask* pTask);
|
||||
int32_t streamTaskRun(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskHandleInput(SStreamTask* pTask, void* data);
|
||||
|
||||
|
|
|
@ -194,6 +194,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
|
|||
|
||||
// source
|
||||
pTask->sourceType = TASK_SOURCE__MERGE;
|
||||
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
|
||||
// exec
|
||||
pTask->execType = TASK_EXEC__NONE;
|
||||
|
@ -235,6 +236,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
|
|||
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
// source
|
||||
pTask->sourceType = TASK_SOURCE__MERGE;
|
||||
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
|
||||
// exec
|
||||
pTask->execType = TASK_EXEC__NONE;
|
||||
|
@ -309,6 +311,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||
// source part
|
||||
pTask->sourceType = TASK_SOURCE__SCAN;
|
||||
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
|
||||
|
||||
// sink part
|
||||
if (level == 0) {
|
||||
|
@ -372,6 +375,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
|
||||
// source part, currently only support multi source
|
||||
pTask->sourceType = TASK_SOURCE__PIPE;
|
||||
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
|
||||
// sink part
|
||||
pTask->sinkType = TASK_SINK__NONE;
|
||||
|
@ -459,6 +463,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
|
||||
// source part
|
||||
pTask->sourceType = TASK_SOURCE__MERGE;
|
||||
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
|
||||
// sink part
|
||||
pTask->sinkType = TASK_SINK__NONE;
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "tq.h"
|
||||
#include "tqueue.h"
|
||||
|
||||
int32_t tqInit() {
|
||||
//
|
||||
|
@ -1032,6 +1033,59 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||
void* pIter = NULL;
|
||||
bool failed = false;
|
||||
|
||||
SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
|
||||
if (pSubmit == NULL) {
|
||||
failed = true;
|
||||
}
|
||||
pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
|
||||
if (pSubmit->dataRef == NULL) {
|
||||
failed = true;
|
||||
}
|
||||
|
||||
pSubmit->type = STREAM_DATA_TYPE_SUBMIT_BLOCK;
|
||||
pSubmit->sourceVer = ver;
|
||||
pSubmit->sourceVg = pTq->pVnode->config.vgId;
|
||||
pSubmit->data = pReq;
|
||||
*pSubmit->dataRef = 1;
|
||||
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = (SStreamTask*)pIter;
|
||||
if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue;
|
||||
|
||||
int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
|
||||
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
|
||||
if (failed) {
|
||||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
||||
continue;
|
||||
}
|
||||
|
||||
streamDataSubmitRefInc(pSubmit);
|
||||
taosWriteQitem(pTask->inputQ, pSubmit);
|
||||
|
||||
int8_t execStatus = atomic_load_8(&pTask->status);
|
||||
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
|
||||
// TODO dispatch task launch msg to fetch queue
|
||||
}
|
||||
|
||||
} else {
|
||||
// blocked or stopped, do nothing
|
||||
}
|
||||
}
|
||||
|
||||
if (!failed) {
|
||||
streamDataSubmitRefDec(pSubmit);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SStreamTaskExecReq req = {0};
|
||||
tDecodeSStreamTaskExecReq(msg, &req);
|
||||
|
@ -1051,7 +1105,7 @@ int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
// try exec
|
||||
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||
if (execStatus == TASK_STATUS__IDLE) {
|
||||
if (streamTaskExecNew(pTask) < 0) {
|
||||
if (streamTaskRun(pTask) < 0) {
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
|
||||
|
||||
goto FAIL;
|
||||
|
|
|
@ -133,7 +133,144 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
|
|||
return inputStatus;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessTriggerReq(SStreamTask* pTask, SMsgCb* pMsgCb, char* msg, int32_t msgLen) {
|
||||
int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
|
||||
void* exec = pTask->exec.runners[0].executor;
|
||||
|
||||
// set input
|
||||
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
||||
ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT);
|
||||
|
||||
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||
} else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) {
|
||||
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
|
||||
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
|
||||
|
||||
SArray* blocks = pBlock->blocks;
|
||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||
}
|
||||
|
||||
// exec
|
||||
while (1) {
|
||||
SSDataBlock* output;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
taosArrayPush(pRes, &output);
|
||||
}
|
||||
|
||||
// destroy
|
||||
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
streamDataSubmitRefDec((SStreamDataSubmit*)data);
|
||||
} else {
|
||||
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TODO: handle version
|
||||
int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) return -1;
|
||||
while (1) {
|
||||
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||
void* exec = pTask->exec.runners[0].executor;
|
||||
if (execStatus == TASK_STATUS__IDLE) {
|
||||
// first run, from qall, handle failure from last exec
|
||||
while (1) {
|
||||
void* data = NULL;
|
||||
taosGetQitem(pTask->inputQAll, &data);
|
||||
if (data == NULL) break;
|
||||
|
||||
streamTaskExecImpl(pTask, data, pRes);
|
||||
|
||||
taosFreeQitem(data);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
||||
resQ->blocks = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) goto FAIL;
|
||||
}
|
||||
}
|
||||
// second run, from inputQ
|
||||
taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
|
||||
while (1) {
|
||||
void* data = NULL;
|
||||
taosGetQitem(pTask->inputQAll, &data);
|
||||
if (data == NULL) break;
|
||||
|
||||
streamTaskExecImpl(pTask, data, pRes);
|
||||
|
||||
taosFreeQitem(data);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
||||
resQ->blocks = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) goto FAIL;
|
||||
}
|
||||
}
|
||||
// set status closing
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
|
||||
// third run, make sure all inputQ is cleared
|
||||
taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
|
||||
while (1) {
|
||||
void* data = NULL;
|
||||
taosGetQitem(pTask->inputQAll, &data);
|
||||
if (data == NULL) break;
|
||||
|
||||
streamTaskExecImpl(pTask, data, pRes);
|
||||
|
||||
taosFreeQitem(data);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
resQ->type = STREAM_INPUT__DATA_BLOCK;
|
||||
resQ->blocks = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) goto FAIL;
|
||||
}
|
||||
}
|
||||
// set status closing
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
|
||||
// third run, make sure all inputQ is cleared
|
||||
taosReadAllQitems(pTask->inputQ, pTask->inputQAll);
|
||||
while (1) {
|
||||
void* data = NULL;
|
||||
taosGetQitem(pTask->inputQAll, &data);
|
||||
if (data == NULL) break;
|
||||
}
|
||||
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||
break;
|
||||
} else if (execStatus == TASK_STATUS__CLOSING) {
|
||||
continue;
|
||||
} else if (execStatus == TASK_STATUS__EXECUTING) {
|
||||
break;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
FAIL:
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t streamTaskDispatchDown(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskSink(SStreamTask* pTask) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
@ -156,8 +293,8 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat
|
|||
// 2.3. closing: keep trying
|
||||
while (1) {
|
||||
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||
void* exec = pTask->exec.runners[0].executor;
|
||||
if (execStatus == TASK_STATUS__IDLE) {
|
||||
void* exec = pTask->exec.runners[0].executor;
|
||||
SArray* pRes = taosArrayInit(0, sizeof(void*));
|
||||
const SArray* blocks = pBlock->blocks;
|
||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||
|
@ -278,7 +415,7 @@ int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskExecNew(SStreamTask* pTask) {
|
||||
int32_t streamTaskRun(SStreamTask* pTask) {
|
||||
SArray* pRes = NULL;
|
||||
if (pTask->execType == TASK_EXEC__PIPE || pTask->execType == TASK_EXEC__MERGE) {
|
||||
// TODO remove multi runner
|
||||
|
@ -494,11 +631,16 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
|
|||
|
||||
pTask->inputQ = taosOpenQueue();
|
||||
pTask->outputQ = taosOpenQueue();
|
||||
if (pTask->inputQ == NULL || pTask->outputQ == NULL) goto FAIL;
|
||||
pTask->inputQAll = taosAllocateQall();
|
||||
pTask->outputQAll = taosAllocateQall();
|
||||
if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL)
|
||||
goto FAIL;
|
||||
return pTask;
|
||||
FAIL:
|
||||
if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
|
||||
if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
|
||||
if (pTask->inputQAll) taosFreeQall(pTask->inputQAll);
|
||||
if (pTask->outputQAll) taosFreeQall(pTask->outputQAll);
|
||||
if (pTask) taosMemoryFree(pTask);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -507,6 +649,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
/*if (tStartEncode(pEncoder) < 0) return -1;*/
|
||||
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
|
||||
|
@ -552,6 +695,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
/*if (tStartDecode(pDecoder) < 0) return -1;*/
|
||||
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
|
||||
|
|
Loading…
Reference in New Issue