Merge pull request #15841 from taosdata/feature/stream
refactor(stream): add stream meta
This commit is contained in:
commit
3d837cf885
|
@ -476,8 +476,10 @@ typedef struct SStreamMeta {
|
|||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
|
||||
void streamMetaClose(SStreamMeta* streamMeta);
|
||||
|
||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
|
||||
int32_t streamMetaBegin(SStreamMeta* pMeta);
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||
|
|
|
@ -117,10 +117,9 @@ typedef struct {
|
|||
struct STQ {
|
||||
SVnode* pVnode;
|
||||
char* path;
|
||||
SHashObj* pushMgr; // consumerId -> STqHandle*
|
||||
SHashObj* handles; // subKey -> STqHandle
|
||||
SHashObj* pStreamTasks; // taksId -> SStreamTask
|
||||
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
|
||||
SHashObj* pushMgr; // consumerId -> STqHandle*
|
||||
SHashObj* handles; // subKey -> STqHandle
|
||||
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
|
||||
|
||||
STqOffsetStore* pOffsetStore;
|
||||
|
||||
|
@ -129,9 +128,7 @@ struct STQ {
|
|||
|
||||
TTB* pAlterInfoStore;
|
||||
|
||||
TDB* pStreamStore;
|
||||
TTB* pTaskDb;
|
||||
TTB* pTaskState;
|
||||
SStreamMeta* pStreamMeta;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
@ -188,6 +185,9 @@ static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ve
|
|||
pOffsetVal->version = ver;
|
||||
}
|
||||
|
||||
// tqStream
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -62,8 +62,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
|
||||
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
|
||||
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
|
||||
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
|
||||
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
|
@ -76,6 +74,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask);
|
||||
if (pTq->pStreamMeta == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return pTq;
|
||||
}
|
||||
|
||||
|
@ -83,18 +86,11 @@ void tqClose(STQ* pTq) {
|
|||
if (pTq) {
|
||||
tqOffsetClose(pTq->pOffsetStore);
|
||||
taosHashCleanup(pTq->handles);
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
tFreeSStreamTask(pTask);
|
||||
}
|
||||
taosHashCleanup(pTq->pStreamTasks);
|
||||
taosHashCleanup(pTq->pushMgr);
|
||||
taosHashCleanup(pTq->pAlterInfo);
|
||||
taosMemoryFree(pTq->path);
|
||||
tqMetaClose(pTq);
|
||||
streamMetaClose(pTq->pStreamMeta);
|
||||
taosMemoryFree(pTq);
|
||||
}
|
||||
}
|
||||
|
@ -672,6 +668,9 @@ FAIL:
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
//
|
||||
return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen);
|
||||
#if 0
|
||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
|
@ -695,6 +694,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
FAIL:
|
||||
if (pTask) taosMemoryFree(pTask);
|
||||
return -1;
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||
|
@ -710,7 +710,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
|||
}
|
||||
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
|
||||
|
@ -744,9 +744,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
//
|
||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
streamProcessRunReq(*ppTask);
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
streamProcessRunReq(pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -762,14 +762,15 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
|||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
||||
tDecodeStreamDispatchReq(&decoder, &req);
|
||||
int32_t taskId = req.taskId;
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
int32_t taskId = req.taskId;
|
||||
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(*ppTask, &req, &rsp, exec);
|
||||
streamProcessDispatchReq(pTask, &req, &rsp, exec);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -779,9 +780,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
|||
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamTaskRecoverReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
streamProcessRecoverReq(*ppTask, pReq, pMsg);
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
streamProcessRecoverReq(pTask, pReq, pMsg);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -791,9 +792,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
streamProcessDispatchRsp(*ppTask, pRsp);
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
streamProcessDispatchRsp(pTask, pRsp);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -803,9 +804,10 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
|
||||
int32_t taskId = pRsp->rspTaskId;
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
streamProcessRecoverRsp(*ppTask, pRsp);
|
||||
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
streamProcessRecoverRsp(pTask, pRsp);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -815,18 +817,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
SStreamTask* pTask = *ppTask;
|
||||
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
||||
}
|
||||
// todo
|
||||
// clear queue
|
||||
// push drop req into queue
|
||||
// launch exec to free memory
|
||||
// remove from hash
|
||||
return 0;
|
||||
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
|
@ -837,18 +828,18 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msgBody, msgLen);
|
||||
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||
int32_t taskId = req.dstTaskId;
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
int32_t taskId = req.dstTaskId;
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessRetrieveReq(*ppTask, &req, &rsp);
|
||||
streamProcessRetrieveReq(pTask, &req, &rsp);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
|
@ -871,16 +862,18 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
int32_t taskId = req.taskId;
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
int32_t taskId = req.taskId;
|
||||
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(*ppTask, &req, &rsp, false);
|
||||
streamProcessDispatchReq(pTask, &req, &rsp, false);
|
||||
return;
|
||||
}
|
||||
|
||||
FAIL:
|
||||
if (pMsg->info.handle == NULL) return;
|
||||
SRpcMsg rsp = {
|
||||
|
|
|
@ -215,7 +215,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
walApplyVer(pTq->pVnode->pWal, ver);
|
||||
|
||||
if (msgType == TDMT_VND_SUBMIT) {
|
||||
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
|
||||
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
|
||||
|
||||
void* data = taosMemoryMalloc(msgLen);
|
||||
if (data == NULL) {
|
||||
|
|
|
@ -413,7 +413,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
}
|
||||
}
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
|
|
|
@ -36,8 +36,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
if (pMeta->pTasks == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (streamMetaBegin(pMeta) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->ahandle = ahandle;
|
||||
pMeta->expandFunc = expandFunc;
|
||||
|
||||
return pMeta;
|
||||
_err:
|
||||
return NULL;
|
||||
|
@ -48,6 +58,48 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
tdbTbClose(pMeta->pTaskDb);
|
||||
tdbTbClose(pMeta->pStateDb);
|
||||
tdbClose(pMeta->db);
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pMeta->pTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
tFreeSStreamTask(pTask);
|
||||
}
|
||||
taosHashCleanup(pMeta->pTasks);
|
||||
taosMemoryFree(pMeta->path);
|
||||
taosMemoryFree(pMeta);
|
||||
}
|
||||
|
||||
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
|
||||
ASSERT(0);
|
||||
goto FAIL;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
|
||||
ASSERT(0);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
||||
|
||||
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
if (pTask) taosMemoryFree(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
|
@ -80,6 +132,16 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
ASSERT((*ppTask)->taskId == taskId);
|
||||
return *ppTask;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
|
|
Loading…
Reference in New Issue