enh(stream): directly dispatch
This commit is contained in:
parent
b084dd8ee6
commit
d43eec5d16
|
@ -479,7 +479,7 @@ int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
|
||||||
int32_t streamSetupTrigger(SStreamTask* pTask);
|
int32_t streamSetupTrigger(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamProcessRunReq(SStreamTask* pTask);
|
int32_t streamProcessRunReq(SStreamTask* pTask);
|
||||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
|
||||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
||||||
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
||||||
|
@ -487,6 +487,18 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
|
||||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||||
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
|
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
|
||||||
|
|
||||||
|
typedef struct SStreamMeta SStreamMeta;
|
||||||
|
|
||||||
|
SStreamMeta* streamMetaOpen();
|
||||||
|
void streamMetaClose(SStreamMeta* streamMeta);
|
||||||
|
|
||||||
|
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
|
|
||||||
|
int32_t streamMetaBegin(SStreamMeta* pMeta);
|
||||||
|
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||||
|
int32_t streamMetaRollBack(SStreamMeta* pMeta);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -165,7 +165,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
break;
|
break;
|
||||||
case STREAM_QUEUE:
|
case STREAM_QUEUE:
|
||||||
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
|
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
|
||||||
|
if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
|
||||||
|
vnodeEnqueueStreamMsg(pVnode->pImpl, pMsg);
|
||||||
|
} else {
|
||||||
taosWriteQitem(pVnode->pStreamQ, pMsg);
|
taosWriteQitem(pVnode->pStreamQ, pMsg);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if 0
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SHashObj* pHash; // taskId -> SStreamTask
|
SHashObj* pHash; // taskId -> SStreamTask
|
||||||
} SStreamMeta;
|
} SStreamMeta;
|
||||||
|
@ -49,6 +50,7 @@ int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||||
int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||||
int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,12 @@
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "sndInt.h"
|
#include "sndInt.h"
|
||||||
#include "tuuid.h"
|
#include "tuuid.h"
|
||||||
|
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return NULL; }
|
||||||
|
void sndClose(SSnode *pSnode) {}
|
||||||
|
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; }
|
||||||
|
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; }
|
||||||
|
|
||||||
|
#if 0
|
||||||
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||||
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
|
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
|
||||||
if (pSnode == NULL) {
|
if (pSnode == NULL) {
|
||||||
|
@ -151,7 +156,7 @@ static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||||
.info = pMsg->info,
|
.info = pMsg->info,
|
||||||
.code = 0,
|
.code = 0,
|
||||||
};
|
};
|
||||||
streamProcessDispatchReq(pTask, &req, &rsp);
|
streamProcessDispatchReq(pTask, &req, &rsp, true);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,3 +268,4 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
|
@ -183,6 +183,8 @@ bool tqNextDataBlock(STqReader *pReader);
|
||||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
|
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
|
||||||
|
|
||||||
|
void vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// sma
|
// sma
|
||||||
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||||
|
|
||||||
|
|
|
@ -115,15 +115,23 @@ typedef struct {
|
||||||
} STqHandle;
|
} STqHandle;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
|
SVnode* pVnode;
|
||||||
char* path;
|
char* path;
|
||||||
SHashObj* pushMgr; // consumerId -> STqHandle*
|
SHashObj* pushMgr; // consumerId -> STqHandle*
|
||||||
SHashObj* handles; // subKey -> STqHandle
|
SHashObj* handles; // subKey -> STqHandle
|
||||||
SHashObj* pStreamTasks; // taksId -> SStreamTask
|
SHashObj* pStreamTasks; // taksId -> SStreamTask
|
||||||
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
|
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
|
||||||
|
|
||||||
STqOffsetStore* pOffsetStore;
|
STqOffsetStore* pOffsetStore;
|
||||||
SVnode* pVnode;
|
|
||||||
TDB* pMetaStore;
|
TDB* pMetaStore;
|
||||||
TTB* pExecStore;
|
TTB* pExecStore;
|
||||||
|
|
||||||
|
TTB* pAlterInfoStore;
|
||||||
|
|
||||||
|
TDB* pStreamStore;
|
||||||
|
TTB* pTaskDb;
|
||||||
|
TTB* pTaskState;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -162,7 +162,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
|
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
||||||
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -280,8 +280,6 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; }
|
|
||||||
|
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SMqPollReq* pReq = pMsg->pCont;
|
SMqPollReq* pReq = pMsg->pCont;
|
||||||
int64_t consumerId = pReq->consumerId;
|
int64_t consumerId = pReq->consumerId;
|
||||||
|
@ -386,6 +384,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
ASSERT(fetchOffsetNew.type == TMQ_OFFSET__LOG);
|
||||||
int64_t fetchVer = fetchOffsetNew.version + 1;
|
int64_t fetchVer = fetchOffsetNew.version + 1;
|
||||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
|
@ -461,22 +460,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
goto OVER;
|
goto OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pCkHead);
|
|
||||||
#if 0
|
|
||||||
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
|
||||||
tqInfo("retrieve using snapshot actual offset: uid %" PRId64 " ts %" PRId64, fetchOffsetNew.uid, fetchOffsetNew.ts);
|
|
||||||
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. send rsp
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
OVER:
|
OVER:
|
||||||
|
@ -614,17 +597,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
int32_t code = 0;
|
||||||
if (pTask == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
SDecoder decoder;
|
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
|
||||||
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
|
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
|
||||||
if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
|
if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
|
||||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||||
|
@ -634,11 +608,15 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
|
||||||
pTask->inputQueue = streamQueueOpen();
|
pTask->inputQueue = streamQueueOpen();
|
||||||
pTask->outputQueue = streamQueueOpen();
|
pTask->outputQueue = streamQueueOpen();
|
||||||
|
|
||||||
|
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
|
||||||
|
code = -1;
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL;
|
|
||||||
|
|
||||||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
|
@ -683,15 +661,35 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
|
tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
|
||||||
pTask->selfChildId);
|
pTask->selfChildId);
|
||||||
|
|
||||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
|
||||||
|
|
||||||
/*SMeta* pMeta = pTq->pVnode->pMeta;*/
|
|
||||||
/*tdbTbUpsert(pMeta->pTaskIdx, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn);*/
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
FAIL:
|
FAIL:
|
||||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||||
|
// TODO free executor
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
|
if (pTask == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||||
|
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
if (tqExpandTask(pTq, pTask) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
FAIL:
|
||||||
if (pTask) taosMemoryFree(pTask);
|
if (pTask) taosMemoryFree(pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -752,7 +750,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
||||||
char* msgStr = pMsg->pCont;
|
char* msgStr = pMsg->pCont;
|
||||||
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
@ -767,7 +765,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
.info = pMsg->info,
|
.info = pMsg->info,
|
||||||
.code = 0,
|
.code = 0,
|
||||||
};
|
};
|
||||||
streamProcessDispatchReq(*ppTask, &req, &rsp);
|
streamProcessDispatchReq(*ppTask, &req, &rsp, exec);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -825,16 +823,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
// launch exec to free memory
|
// launch exec to free memory
|
||||||
// remove from hash
|
// remove from hash
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
|
||||||
// set status dropping
|
|
||||||
ASSERT(code == 0);
|
|
||||||
if (code == 0) {
|
|
||||||
// sendrsp
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
@ -863,3 +851,37 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
//
|
//
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
|
STQ* pTq = pVnode->pTq;
|
||||||
|
char* msgStr = pMsg->pCont;
|
||||||
|
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SStreamDispatchReq req;
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, msgBody, msgLen);
|
||||||
|
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
|
||||||
|
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taskId = req.taskId;
|
||||||
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||||
|
if (ppTask) {
|
||||||
|
SRpcMsg rsp = {
|
||||||
|
.info = pMsg->info,
|
||||||
|
.code = 0,
|
||||||
|
};
|
||||||
|
streamProcessDispatchReq(*ppTask, &req, &rsp, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
FAIL:
|
||||||
|
if (pMsg->info.handle == NULL) return;
|
||||||
|
SRpcMsg rsp = {
|
||||||
|
.code = code,
|
||||||
|
.info = pMsg->info,
|
||||||
|
};
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,209 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "meta.h"
|
||||||
|
#include "tdbInt.h"
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
|
// STqSnapReader ========================================
|
||||||
|
struct STqSnapReader {
|
||||||
|
STQ* pTq;
|
||||||
|
int64_t sver;
|
||||||
|
int64_t ever;
|
||||||
|
TBC* pCur;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STqSnapReader* pReader = NULL;
|
||||||
|
|
||||||
|
// alloc
|
||||||
|
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
|
||||||
|
if (pReader == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pReader->pTq = pTq;
|
||||||
|
pReader->sver = sver;
|
||||||
|
pReader->ever = ever;
|
||||||
|
|
||||||
|
// impl
|
||||||
|
code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL);
|
||||||
|
if (code) {
|
||||||
|
taosMemoryFree(pReader);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbcMoveToFirst(pReader->pCur);
|
||||||
|
if (code) {
|
||||||
|
taosMemoryFree(pReader);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
|
*ppReader = pReader;
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
*ppReader = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
tdbTbcClose((*ppReader)->pCur);
|
||||||
|
taosMemoryFree(*ppReader);
|
||||||
|
*ppReader = NULL;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
const void* pKey = NULL;
|
||||||
|
const void* pVal = NULL;
|
||||||
|
int32_t kLen = 0;
|
||||||
|
int32_t vLen = 0;
|
||||||
|
SDecoder decoder;
|
||||||
|
STqHandle handle;
|
||||||
|
|
||||||
|
*ppData = NULL;
|
||||||
|
for (;;) {
|
||||||
|
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
|
tDecodeSTqHandle(&decoder, &handle);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
|
||||||
|
tdbTbcMoveToNext(pReader->pCur);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
tdbTbcMoveToNext(pReader->pCur);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pVal && vLen);
|
||||||
|
|
||||||
|
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||||
|
if (*ppData == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||||
|
pHdr->type = SNAP_DATA_TQ_HANDLE;
|
||||||
|
pHdr->size = vLen;
|
||||||
|
memcpy(pHdr->data, pVal, vLen);
|
||||||
|
|
||||||
|
tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
||||||
|
handle.snapshotVer, handle.subKey, vLen);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// STqSnapWriter ========================================
|
||||||
|
struct STqSnapWriter {
|
||||||
|
STQ* pTq;
|
||||||
|
int64_t sver;
|
||||||
|
int64_t ever;
|
||||||
|
TXN txn;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STqSnapWriter* pWriter;
|
||||||
|
|
||||||
|
// alloc
|
||||||
|
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||||
|
if (pWriter == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pWriter->pTq = pTq;
|
||||||
|
pWriter->sver = sver;
|
||||||
|
pWriter->ever = ever;
|
||||||
|
|
||||||
|
if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppWriter = pWriter;
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
*ppWriter = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STqSnapWriter* pWriter = *ppWriter;
|
||||||
|
STQ* pTq = pWriter->pTq;
|
||||||
|
|
||||||
|
if (rollback) {
|
||||||
|
ASSERT(0);
|
||||||
|
} else {
|
||||||
|
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pWriter);
|
||||||
|
*ppWriter = NULL;
|
||||||
|
|
||||||
|
// restore from metastore
|
||||||
|
if (tqMetaRestoreHandle(pTq) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STQ* pTq = pWriter->pTq;
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
SDecoder* pDecoder = &decoder;
|
||||||
|
STqHandle handle;
|
||||||
|
|
||||||
|
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
|
code = tDecodeSTqHandle(pDecoder, &handle);
|
||||||
|
if (code) goto _err;
|
||||||
|
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
|
||||||
|
if (code < 0) goto _err;
|
||||||
|
tDecoderClear(pDecoder);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tDecoderClear(pDecoder);
|
||||||
|
tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -0,0 +1,209 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "meta.h"
|
||||||
|
#include "tdbInt.h"
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
|
// STqSnapReader ========================================
|
||||||
|
struct STqSnapReader {
|
||||||
|
STQ* pTq;
|
||||||
|
int64_t sver;
|
||||||
|
int64_t ever;
|
||||||
|
TBC* pCur;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STqSnapReader* pReader = NULL;
|
||||||
|
|
||||||
|
// alloc
|
||||||
|
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
|
||||||
|
if (pReader == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pReader->pTq = pTq;
|
||||||
|
pReader->sver = sver;
|
||||||
|
pReader->ever = ever;
|
||||||
|
|
||||||
|
// impl
|
||||||
|
code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL);
|
||||||
|
if (code) {
|
||||||
|
taosMemoryFree(pReader);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbcMoveToFirst(pReader->pCur);
|
||||||
|
if (code) {
|
||||||
|
taosMemoryFree(pReader);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
|
*ppReader = pReader;
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
*ppReader = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
tdbTbcClose((*ppReader)->pCur);
|
||||||
|
taosMemoryFree(*ppReader);
|
||||||
|
*ppReader = NULL;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
const void* pKey = NULL;
|
||||||
|
const void* pVal = NULL;
|
||||||
|
int32_t kLen = 0;
|
||||||
|
int32_t vLen = 0;
|
||||||
|
SDecoder decoder;
|
||||||
|
STqHandle handle;
|
||||||
|
|
||||||
|
*ppData = NULL;
|
||||||
|
for (;;) {
|
||||||
|
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
|
tDecodeSTqHandle(&decoder, &handle);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
|
||||||
|
tdbTbcMoveToNext(pReader->pCur);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
tdbTbcMoveToNext(pReader->pCur);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pVal && vLen);
|
||||||
|
|
||||||
|
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||||
|
if (*ppData == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||||
|
pHdr->type = SNAP_DATA_TQ_HANDLE;
|
||||||
|
pHdr->size = vLen;
|
||||||
|
memcpy(pHdr->data, pVal, vLen);
|
||||||
|
|
||||||
|
tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
||||||
|
handle.snapshotVer, handle.subKey, vLen);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// STqSnapWriter ========================================
|
||||||
|
struct STqSnapWriter {
|
||||||
|
STQ* pTq;
|
||||||
|
int64_t sver;
|
||||||
|
int64_t ever;
|
||||||
|
TXN txn;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STqSnapWriter* pWriter;
|
||||||
|
|
||||||
|
// alloc
|
||||||
|
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||||
|
if (pWriter == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pWriter->pTq = pTq;
|
||||||
|
pWriter->sver = sver;
|
||||||
|
pWriter->ever = ever;
|
||||||
|
|
||||||
|
if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppWriter = pWriter;
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
*ppWriter = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STqSnapWriter* pWriter = *ppWriter;
|
||||||
|
STQ* pTq = pWriter->pTq;
|
||||||
|
|
||||||
|
if (rollback) {
|
||||||
|
ASSERT(0);
|
||||||
|
} else {
|
||||||
|
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pWriter);
|
||||||
|
*ppWriter = NULL;
|
||||||
|
|
||||||
|
// restore from metastore
|
||||||
|
if (tqMetaRestoreHandle(pTq) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STQ* pTq = pWriter->pTq;
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
SDecoder* pDecoder = &decoder;
|
||||||
|
STqHandle handle;
|
||||||
|
|
||||||
|
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
|
code = tDecodeSTqHandle(pDecoder, &handle);
|
||||||
|
if (code) goto _err;
|
||||||
|
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
|
||||||
|
if (code < 0) goto _err;
|
||||||
|
tDecoderClear(pDecoder);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tDecoderClear(pDecoder);
|
||||||
|
tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -297,8 +297,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
vTrace("message in fetch queue is processing");
|
vTrace("message in fetch queue is processing");
|
||||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
|
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||||
pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) &&
|
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||||
!vnodeIsLeader(pVnode)) {
|
!vnodeIsLeader(pVnode)) {
|
||||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -330,7 +330,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
case TDMT_STREAM_TASK_RUN:
|
case TDMT_STREAM_TASK_RUN:
|
||||||
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_DISPATCH:
|
case TDMT_STREAM_TASK_DISPATCH:
|
||||||
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
|
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
|
||||||
case TDMT_STREAM_TASK_RECOVER:
|
case TDMT_STREAM_TASK_RECOVER:
|
||||||
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_RETRIEVE:
|
case TDMT_STREAM_RETRIEVE:
|
||||||
|
|
|
@ -8,7 +8,7 @@ target_include_directories(
|
||||||
|
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
stream
|
stream
|
||||||
PRIVATE os util transport qcom executor
|
PRIVATE os util transport qcom executor tdb
|
||||||
)
|
)
|
||||||
|
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
|
|
|
@ -175,41 +175,22 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
||||||
qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
|
qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
|
||||||
pReq->upstreamTaskId);
|
pReq->upstreamTaskId);
|
||||||
|
|
||||||
// 1. handle input
|
|
||||||
streamTaskEnqueue(pTask, pReq, pRsp);
|
streamTaskEnqueue(pTask, pReq, pRsp);
|
||||||
|
|
||||||
// 2. try exec
|
if (exec) {
|
||||||
// 2.1. idle: exec
|
|
||||||
// 2.2. executing: return
|
|
||||||
// 2.3. closing: keep trying
|
|
||||||
#if 0
|
|
||||||
if (pTask->execType != TASK_EXEC__NONE) {
|
|
||||||
#endif
|
|
||||||
streamExec(pTask);
|
streamExec(pTask);
|
||||||
#if 0
|
|
||||||
} else {
|
|
||||||
ASSERT(pTask->sinkType != TASK_SINK__NONE);
|
|
||||||
while (1) {
|
|
||||||
void* data = streamQueueNextItem(pTask->inputQueue);
|
|
||||||
if (data == NULL) return 0;
|
|
||||||
if (streamTaskOutput(pTask, data) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// 3. handle output
|
|
||||||
// 3.1 check and set status
|
|
||||||
// 3.2 dispatch / sink
|
|
||||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||||
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||||
streamDispatch(pTask);
|
streamDispatch(pTask);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
streamLaunchByWrite(pTask, pTask->nodeId);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,159 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "executor.h"
|
||||||
|
#include "tdbInt.h"
|
||||||
|
#include "tstream.h"
|
||||||
|
|
||||||
|
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);
|
||||||
|
|
||||||
|
typedef struct SStreamMeta {
|
||||||
|
char* path;
|
||||||
|
TDB* db;
|
||||||
|
TTB* pTaskDb;
|
||||||
|
TTB* pStateDb;
|
||||||
|
SHashObj* pTasks;
|
||||||
|
void* ahandle;
|
||||||
|
TXN txn;
|
||||||
|
FTaskExpand* expandFunc;
|
||||||
|
} SStreamMeta;
|
||||||
|
|
||||||
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
|
||||||
|
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||||
|
if (pMeta == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pMeta->path = strdup(path);
|
||||||
|
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// open state storage backend
|
||||||
|
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMeta->ahandle = ahandle;
|
||||||
|
pMeta->expandFunc = expandFunc;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamMetaClose(SStreamMeta* pMeta) {
|
||||||
|
//
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
|
void* buf = NULL;
|
||||||
|
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
||||||
|
|
||||||
|
int32_t len;
|
||||||
|
int32_t code;
|
||||||
|
tEncodeSize(tEncodeSStreamTask, pTask, len, code);
|
||||||
|
if (code < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
buf = taosMemoryCalloc(1, sizeof(len));
|
||||||
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, buf, len);
|
||||||
|
tEncodeSStreamTask(&encoder, pTask);
|
||||||
|
|
||||||
|
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||||
|
if (ppTask) {
|
||||||
|
SStreamTask* pTask = *ppTask;
|
||||||
|
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||||
|
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
|
||||||
|
/*return -1;*/
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
||||||
|
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||||
|
0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaCommit(SStreamMeta* pMeta) {
|
||||||
|
if (tdbCommit(pMeta->db, &pMeta->txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaRollBack(SStreamMeta* pMeta) {
|
||||||
|
// TODO tdb rollback
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int32_t streamRestoreTask(SStreamMeta* pMeta) {
|
||||||
|
TBC* pCur = NULL;
|
||||||
|
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pKey = NULL;
|
||||||
|
int32_t kLen = 0;
|
||||||
|
void* pVal = NULL;
|
||||||
|
int32_t vLen = 0;
|
||||||
|
SDecoder decoder;
|
||||||
|
|
||||||
|
tdbTbcMoveToFirst(pCur);
|
||||||
|
|
||||||
|
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
|
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
|
if (pTask == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
|
tDecodeSStreamTask(&decoder, pTask);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue