Merge pull request #14972 from taosdata/feature/stream
feat(stream): pipelined exec for recovering
This commit is contained in:
commit
53d8c2d79a
|
@ -31,6 +31,11 @@ extern "C" {
|
|||
|
||||
typedef struct SStreamTask SStreamTask;
|
||||
|
||||
enum {
|
||||
STREAM_STATUS__NORMAL = 0,
|
||||
STREAM_STATUS__RECOVER,
|
||||
};
|
||||
|
||||
enum {
|
||||
TASK_STATUS__NORMAL = 0,
|
||||
TASK_STATUS__DROPPING,
|
||||
|
|
|
@ -177,7 +177,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
|
|||
|
||||
taosWLockLatch(&pOldStream->lock);
|
||||
|
||||
// TODO handle update
|
||||
pOldStream->status = pNewStream->status;
|
||||
|
||||
taosWUnLockLatch(&pOldStream->lock);
|
||||
return 0;
|
||||
|
@ -395,6 +395,20 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetStreamRecover(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream) {
|
||||
SStreamObj streamObj = {0};
|
||||
memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
|
||||
streamObj.status = STREAM_STATUS__RECOVER;
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
|
||||
SStbObj *pStb = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
|
@ -492,6 +506,76 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
|
||||
SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pReq->streamId = pTask->streamId;
|
||||
pReq->taskId = pTask->taskId;
|
||||
int32_t len;
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSMStreamTaskRecoverReq, pReq, len, code);
|
||||
if (code != 0) {
|
||||
return -1;
|
||||
}
|
||||
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeSMStreamTaskRecoverReq(&encoder, pReq);
|
||||
((SMsgHead *)buf)->vgId = pTask->nodeId;
|
||||
|
||||
STransAction action = {0};
|
||||
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
|
||||
action.pCont = buf;
|
||||
action.contLen = sizeof(SMsgHead) + len;
|
||||
action.msgType = TDMT_STREAM_TASK_RECOVER;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
if (pStream->isDistributed) {
|
||||
int32_t lv = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < lv; i++) {
|
||||
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, 0);
|
||||
if (!pTask->isDataScan && pTask->execType != TASK_EXEC__NONE) {
|
||||
ASSERT(sz == 1);
|
||||
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int32_t lv = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < lv; i++) {
|
||||
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (!pTask->isDataScan) break;
|
||||
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
||||
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t lv = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < lv; i++) {
|
||||
|
@ -712,14 +796,14 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
|
|||
mDebug("trans:%d, used to drop stream:%s", pTrans->id, recoverReq.name);
|
||||
|
||||
// broadcast to recover all tasks
|
||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
if (mndRecoverStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to recover task since %s", recoverReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// update stream status
|
||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||
if (mndSetStreamRecover(pMnode, pTrans, pStream) < 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -30,15 +30,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
enum {
|
||||
STREAM_STATUS__RUNNING = 1,
|
||||
STREAM_STATUS__STOPPED,
|
||||
STREAM_STATUS__CREATING,
|
||||
STREAM_STATUS__STOPING,
|
||||
STREAM_STATUS__RESTORING,
|
||||
STREAM_STATUS__DELETING,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
SHashObj* pHash; // taskId -> SStreamTask
|
||||
} SStreamMeta;
|
||||
|
|
|
@ -33,6 +33,8 @@ typedef struct {
|
|||
static SStreamGlobalEnv streamEnv;
|
||||
|
||||
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum);
|
||||
|
||||
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
||||
|
|
|
@ -82,6 +82,63 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
|
||||
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
||||
|
||||
void* exec = pTask->exec.executor;
|
||||
|
||||
while (1) {
|
||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t batchCnt = 0;
|
||||
while (1) {
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
|
||||
SSDataBlock block = {0};
|
||||
assignOneDataBlock(&block, output);
|
||||
block.info.childId = pTask->selfChildId;
|
||||
taosArrayPush(pRes, &block);
|
||||
|
||||
if (++batchCnt >= batchNum) break;
|
||||
}
|
||||
if (taosArrayGetSize(pRes) == 0) {
|
||||
taosArrayDestroy(pRes);
|
||||
break;
|
||||
}
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
if (qRes == NULL) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return -1;
|
||||
}
|
||||
|
||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||
qRes->blocks = pRes;
|
||||
qRes->childId = pTask->selfChildId;
|
||||
|
||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
taosFreeQitem(qRes);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||
streamDispatch(pTask, pTask->pMsgCb);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||
int32_t cnt = 0;
|
||||
void* data = NULL;
|
||||
|
|
|
@ -125,7 +125,11 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq*
|
|||
}
|
||||
|
||||
if (pTask->taskStatus == TASK_STATUS__RECOVERING) {
|
||||
streamProcessRunReq(pTask);
|
||||
if (streamPipelineExec(pTask, 10) < 0) {
|
||||
// set fail
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue