feat(stream): pipelined exec for recovering
This commit is contained in:
parent
9394e3380f
commit
280ed4f65d
|
@ -31,6 +31,11 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SStreamTask SStreamTask;
|
typedef struct SStreamTask SStreamTask;
|
||||||
|
|
||||||
|
enum {
|
||||||
|
STREAM_STATUS__NORMAL = 0,
|
||||||
|
STREAM_STATUS__RECOVER,
|
||||||
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_STATUS__NORMAL = 0,
|
TASK_STATUS__NORMAL = 0,
|
||||||
TASK_STATUS__DROPPING,
|
TASK_STATUS__DROPPING,
|
||||||
|
|
|
@ -177,7 +177,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
|
||||||
|
|
||||||
taosWLockLatch(&pOldStream->lock);
|
taosWLockLatch(&pOldStream->lock);
|
||||||
|
|
||||||
// TODO handle update
|
pOldStream->status = pNewStream->status;
|
||||||
|
|
||||||
taosWUnLockLatch(&pOldStream->lock);
|
taosWUnLockLatch(&pOldStream->lock);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -395,6 +395,20 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetStreamRecover(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream) {
|
||||||
|
SStreamObj streamObj = {0};
|
||||||
|
memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
|
||||||
|
streamObj.status = STREAM_STATUS__RECOVER;
|
||||||
|
SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj);
|
||||||
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
|
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
|
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
|
||||||
SStbObj *pStb = NULL;
|
SStbObj *pStb = NULL;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
|
@ -492,6 +506,76 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
|
SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq));
|
||||||
|
if (pReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pReq->streamId = pTask->streamId;
|
||||||
|
pReq->taskId = pTask->taskId;
|
||||||
|
int32_t len;
|
||||||
|
int32_t code;
|
||||||
|
tEncodeSize(tEncodeSMStreamTaskRecoverReq, pReq, len, code);
|
||||||
|
if (code != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||||
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
tEncodeSMStreamTaskRecoverReq(&encoder, pReq);
|
||||||
|
((SMsgHead *)buf)->vgId = pTask->nodeId;
|
||||||
|
|
||||||
|
STransAction action = {0};
|
||||||
|
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
|
||||||
|
action.pCont = buf;
|
||||||
|
action.contLen = sizeof(SMsgHead) + len;
|
||||||
|
action.msgType = TDMT_STREAM_TASK_RECOVER;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||||
|
if (pStream->isDistributed) {
|
||||||
|
int32_t lv = taosArrayGetSize(pStream->tasks);
|
||||||
|
for (int32_t i = 0; i < lv; i++) {
|
||||||
|
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||||
|
int32_t sz = taosArrayGetSize(pTasks);
|
||||||
|
SStreamTask *pTask = taosArrayGetP(pTasks, 0);
|
||||||
|
if (!pTask->isDataScan && pTask->execType != TASK_EXEC__NONE) {
|
||||||
|
ASSERT(sz == 1);
|
||||||
|
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int32_t lv = taosArrayGetSize(pStream->tasks);
|
||||||
|
for (int32_t i = 0; i < lv; i++) {
|
||||||
|
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||||
|
int32_t sz = taosArrayGetSize(pTasks);
|
||||||
|
for (int32_t j = 0; j < sz; j++) {
|
||||||
|
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||||
|
if (!pTask->isDataScan) break;
|
||||||
|
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
||||||
|
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||||
int32_t lv = taosArrayGetSize(pStream->tasks);
|
int32_t lv = taosArrayGetSize(pStream->tasks);
|
||||||
for (int32_t i = 0; i < lv; i++) {
|
for (int32_t i = 0; i < lv; i++) {
|
||||||
|
@ -712,14 +796,14 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
|
||||||
mDebug("trans:%d, used to drop stream:%s", pTrans->id, recoverReq.name);
|
mDebug("trans:%d, used to drop stream:%s", pTrans->id, recoverReq.name);
|
||||||
|
|
||||||
// broadcast to recover all tasks
|
// broadcast to recover all tasks
|
||||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
if (mndRecoverStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%s, failed to recover task since %s", recoverReq.name, terrstr());
|
mError("stream:%s, failed to recover task since %s", recoverReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update stream status
|
// update stream status
|
||||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
if (mndSetStreamRecover(pMnode, pTrans, pStream) < 0) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,15 +30,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
enum {
|
|
||||||
STREAM_STATUS__RUNNING = 1,
|
|
||||||
STREAM_STATUS__STOPPED,
|
|
||||||
STREAM_STATUS__CREATING,
|
|
||||||
STREAM_STATUS__STOPING,
|
|
||||||
STREAM_STATUS__RESTORING,
|
|
||||||
STREAM_STATUS__DELETING,
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SHashObj* pHash; // taskId -> SStreamTask
|
SHashObj* pHash; // taskId -> SStreamTask
|
||||||
} SStreamMeta;
|
} SStreamMeta;
|
||||||
|
|
|
@ -33,6 +33,8 @@ typedef struct {
|
||||||
static SStreamGlobalEnv streamEnv;
|
static SStreamGlobalEnv streamEnv;
|
||||||
|
|
||||||
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
|
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||||
|
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum);
|
||||||
|
|
||||||
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
|
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||||
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
||||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
||||||
|
|
|
@ -82,6 +82,63 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
|
||||||
|
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
||||||
|
|
||||||
|
void* exec = pTask->exec.executor;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
|
if (pRes == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t batchCnt = 0;
|
||||||
|
while (1) {
|
||||||
|
SSDataBlock* output = NULL;
|
||||||
|
uint64_t ts = 0;
|
||||||
|
if (qExecTask(exec, &output, &ts) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
if (output == NULL) break;
|
||||||
|
|
||||||
|
SSDataBlock block = {0};
|
||||||
|
assignOneDataBlock(&block, output);
|
||||||
|
block.info.childId = pTask->selfChildId;
|
||||||
|
taosArrayPush(pRes, &block);
|
||||||
|
|
||||||
|
if (++batchCnt >= batchNum) break;
|
||||||
|
}
|
||||||
|
if (taosArrayGetSize(pRes) == 0) {
|
||||||
|
taosArrayDestroy(pRes);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||||
|
if (qRes == NULL) {
|
||||||
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
|
qRes->blocks = pRes;
|
||||||
|
qRes->childId = pTask->selfChildId;
|
||||||
|
|
||||||
|
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||||
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
|
taosFreeQitem(qRes);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||||
|
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||||
|
streamDispatch(pTask, pTask->pMsgCb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
int32_t cnt = 0;
|
int32_t cnt = 0;
|
||||||
void* data = NULL;
|
void* data = NULL;
|
||||||
|
|
|
@ -125,7 +125,11 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->taskStatus == TASK_STATUS__RECOVERING) {
|
if (pTask->taskStatus == TASK_STATUS__RECOVERING) {
|
||||||
streamProcessRunReq(pTask);
|
if (streamPipelineExec(pTask, 10) < 0) {
|
||||||
|
// set fail
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue