From 90e4cab2af624ecb5e3680d4bf8b203c4157444e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 28 Mar 2022 14:16:58 +0800 Subject: [PATCH 1/5] fix stream serialization --- include/common/tmsgdef.h | 1 - source/dnode/mgmt/vnode/src/vmMsg.c | 1 - source/dnode/mnode/impl/src/mndDef.c | 17 +++++++++-------- source/dnode/vnode/src/inc/vnd.h | 2 +- source/dnode/vnode/src/tq/tq.c | 6 ++---- source/dnode/vnode/src/vnd/vnodeQuery.c | 5 +++-- source/dnode/vnode/src/vnd/vnodeWrite.c | 5 +++++ source/libs/stream/src/tstream.c | 21 ++++++++++++++++++--- 8 files changed, 38 insertions(+), 20 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 051ee34644..36a489eb59 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -192,7 +192,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 97d829571f..1682c6043d 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 24f2a5df22..1b3564924a 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI32(pEncoder, sz) < 0) return -1; for (int32_t i = 0; i < sz; i++) { - SArray *pArray = taosArrayGet(pObj->tasks, i); + SArray *pArray = taosArrayGetP(pObj->tasks, i); int32_t innerSz = taosArrayGetSize(pArray); if (tEncodeI32(pEncoder, innerSz) < 0) return -1; for (int32_t j = 0; j < innerSz; j++) { - SStreamTask *pTask = taosArrayGet(pArray, j); + SStreamTask *pTask = taosArrayGetP(pArray, j); if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1; } } @@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { int32_t sz; if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (sz != 0) { - pObj->tasks = taosArrayInit(sz, sizeof(SArray)); + pObj->tasks = taosArrayInit(sz, sizeof(void *)); for (int32_t i = 0; i < sz; i++) { int32_t innerSz; if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; - SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask)); + SArray *pArray = taosArrayInit(innerSz, sizeof(void *)); for (int32_t j = 0; j < innerSz; j++) { - SStreamTask task; - if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1; - taosArrayPush(pArray, &task); + SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) return -1; + if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1; + taosArrayPush(pArray, &pTask); } - taosArrayPush(pObj->tasks, pArray); + taosArrayPush(pObj->tasks, &pArray); } } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 8d256995c6..3309686bb7 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -198,7 +198,7 @@ int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); -int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4661668cbe..c354b01501 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -497,11 +497,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { return 0; } -int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { - char* msgstr = POINTER_SHIFT(msg->pCont, sizeof(SMsgHead)); - +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) { SStreamTaskExecReq req; - tDecodeSStreamTaskExecReq(msgstr, &req); + tDecodeSStreamTaskExecReq(msg, &req); int32_t taskId = req.taskId; SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 94e183f525..74d7558e0d 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in fetch queue is processing"); + char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); switch (pMsg->msgType) { case TDMT_VND_FETCH: return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); @@ -65,10 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); - case TDMT_VND_TASK_EXEC: case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_MERGE_EXEC: - return tqProcessTaskExec(pVnode->pTq, pMsg); + return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen); case TDMT_VND_STREAM_TRIGGER: return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); case TDMT_VND_QUERY_HEARTBEAT: diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index a14828cc22..279926747f 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -167,6 +167,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pMsg->contLen - sizeof(SMsgHead)) < 0) { } } break; + case TDMT_VND_TASK_WRITE_EXEC: { + if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + } + } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA #if 0 SSmaCfg vCreateSmaReq = {0}; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 31a06a9d9a..6d69a8960e 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -205,9 +205,16 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; } - if (pTask->sinkType != TASK_SINK__NONE) { - // TODO: wrap + if (pTask->sinkType == TASK_SINK__TABLE) { if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SMA) { + if (tEncodeI8(pEncoder, pTask->smaSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__FETCH) { + if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SHOW) { + if (tEncodeI8(pEncoder, pTask->showSink.reserved) < 0) return -1; + } else { + ASSERT(pTask->sinkType == TASK_SINK__NONE); } if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { @@ -244,8 +251,16 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; } - if (pTask->sinkType != TASK_SINK__NONE) { + if (pTask->sinkType == TASK_SINK__TABLE) { if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SMA) { + if (tDecodeI8(pDecoder, &pTask->smaSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__FETCH) { + if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SHOW) { + if (tDecodeI8(pDecoder, &pTask->showSink.reserved) < 0) return -1; + } else { + ASSERT(pTask->sinkType == TASK_SINK__NONE); } if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { From a3b4de6631a33cf8df4cbc4b6106aeb9fe1f7835 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 28 Mar 2022 14:40:22 +0800 Subject: [PATCH 2/5] stream support sma --- include/libs/stream/tstream.h | 9 +++++++-- source/client/src/tmq.c | 2 +- source/dnode/mnode/impl/inc/mndScheduler.h | 2 +- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 6 +++++- source/dnode/mnode/impl/src/mndSma.c | 7 ++++--- source/dnode/mnode/impl/src/mndStream.c | 6 +++--- source/dnode/vnode/src/inc/vnd.h | 3 +++ source/dnode/vnode/src/tq/tq.c | 1 + source/libs/stream/src/tstream.c | 5 +++-- 10 files changed, 29 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4efddde935..8be9bbbebd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -82,8 +82,12 @@ typedef struct { SHashObj* pHash; // groupId to tbuid } STaskSinkTb; +typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data); + typedef struct { - int8_t reserved; + int64_t smaId; + // following are not applicable to encoder and decoder + FSmaHandle* smaHandle; } STaskSinkSma; typedef struct { @@ -156,7 +160,8 @@ typedef struct { STaskDispatcherShuffle shuffleDispatcher; }; - // state storage + // application storage + void* ahandle; } SStreamTask; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 8eaca6853d..5d00cca76e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -877,7 +877,7 @@ WRITE_QUEUE_FAIL: } bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { - printf("call update ep %d\n", epoch); + /*printf("call update ep %d\n", epoch);*/ bool set = false; int32_t sz = taosArrayGetSize(pRsp->topics); SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 42951beca2..416061bf34 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b5d22cb7a5..e7cdd34a7e 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 69ee1a5696..697811cd04 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { SSdb* pSdb = pMnode->pSdb; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { @@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // only for inplace pTask->sinkType = TASK_SINK__SHOW; pTask->showSink.reserved = 0; + if (smaId != -1) { + pTask->sinkType = TASK_SINK__SMA; + pTask->smaSink.smaId = smaId; + } } else { pTask->sinkType = TASK_SINK__NONE; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 146975aa38..5c62cfa0f2 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {} static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { terrno = TSDB_CODE_OUT_OF_MEMORY; - int32_t size = sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE; + int32_t size = + sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size); if (pRaw == NULL) goto _OVER; @@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -491,7 +492,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); goto _OVER; } - + pStream = mndAcquireStream(pMnode, createReq.name); if (pStream != NULL) { mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c02fec0a5f..bbb2f64282 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { @@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } - if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { + if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) { mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); return -1; } @@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 3309686bb7..7b0606512c 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -202,6 +202,9 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); +// sma +void smaHandleRes(SVnode* pVnode, int64_t smaId, const SArray* data); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c354b01501..55202335e0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -476,6 +476,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { if (tqExpandTask(pTq, pTask, 4) < 0) { ASSERT(0); } + pTask->ahandle = pTq->pVnode; taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 6d69a8960e..028e310a25 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in if (pTask->sinkType == TASK_SINK__TABLE) { // } else if (pTask->sinkType == TASK_SINK__SMA) { + pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); // } else if (pTask->sinkType == TASK_SINK__FETCH) { // @@ -208,7 +209,7 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { if (pTask->sinkType == TASK_SINK__TABLE) { if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { - if (tEncodeI8(pEncoder, pTask->smaSink.reserved) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SHOW) { @@ -254,7 +255,7 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { if (pTask->sinkType == TASK_SINK__TABLE) { if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { - if (tDecodeI8(pDecoder, &pTask->smaSink.reserved) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SHOW) { From df319ccb8db49d39bc5c2c6e597e05396ff3c37b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 28 Mar 2022 08:06:27 +0000 Subject: [PATCH 3/5] fix more TDB os error --- source/libs/tdb/src/db/tdbPager.c | 4 ++-- source/libs/tdb/src/inc/tdbOs.h | 20 ++++++++++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 4fac00d5ad..748633da34 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -80,7 +80,7 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) { // pPager->pCache pPager->pCache = pCache; - pPager->fd = tdbOsOpen(pPager->dbFileName, O_RDWR | O_CREAT); + pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755); if (pPager->fd < 0) { return -1; } @@ -168,7 +168,7 @@ int tdbPagerBegin(SPager *pPager) { } // Open the journal - pPager->jfd = tdbOsOpen(pPager->jFileName, O_RDWR | O_CREAT); + pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755); if (pPager->jfd < 0) { return -1; } diff --git a/source/libs/tdb/src/inc/tdbOs.h b/source/libs/tdb/src/inc/tdbOs.h index 751c105913..bc610917f6 100644 --- a/source/libs/tdb/src/inc/tdbOs.h +++ b/source/libs/tdb/src/inc/tdbOs.h @@ -46,7 +46,15 @@ extern "C" { /* file */ typedef TdFilePtr tdb_fd_t; -#define tdbOsOpen taosOpenFile +#define TDB_O_CREAT TD_FILE_CTEATE +#define TDB_O_WRITE TD_FILE_WRITE +#define TDB_O_READ TD_FILE_READ +#define TDB_O_TRUNC TD_FILE_TRUNC +#define TDB_O_APPEND TD_FILE_APPEND +#define TDB_O_RDWR (TD_FILE_WRITE) | (TD_FILE_READ) + +#define tdbOsOpen(PATH, OPTION, MODE) taosOpenFile((PATH), (OPTION)) + #define tdbOsClose(FD) taosCloseFile(&(FD)) #define tdbOsRead taosReadFile #define tdbOsPRead taosPReadFile @@ -64,7 +72,15 @@ typedef TdFilePtr tdb_fd_t; /* file */ typedef int tdb_fd_t; -#define tdbOsOpen open +#define TDB_O_CREAT O_CREAT +#define TDB_O_WRITE O_WRONLY +#define TDB_O_READ O_RDONLY +#define TDB_O_TRUNC O_TRUNC +#define TDB_O_APPEND O_APPEND +#define TDB_O_RDWR O_RDWR + +#define tdbOsOpen(PATH, OPTION, MODE) open((PATH), (OPTION), (MODE)) + #define tdbOsClose close i64 tdbOsRead(tdb_fd_t fd, void *pData, i64 nBytes); From 249c33763221bf4aec3c999652f557deade7f7be Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 28 Mar 2022 08:22:55 +0000 Subject: [PATCH 4/5] refact TDB --- source/libs/tdb/src/inc/tdbOs.h | 65 ++++++++++++++------------------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/source/libs/tdb/src/inc/tdbOs.h b/source/libs/tdb/src/inc/tdbOs.h index bc610917f6..794d4c502a 100644 --- a/source/libs/tdb/src/inc/tdbOs.h +++ b/source/libs/tdb/src/inc/tdbOs.h @@ -23,30 +23,19 @@ extern "C" { // TODO: use cmake to control the option #define TDB_FOR_TDENGINE -// For memory ----------------- #ifdef TDB_FOR_TDENGINE +// For memory ----------------- #define tdbOsMalloc taosMemoryMalloc #define tdbOsCalloc taosMemoryCalloc #define tdbOsRealloc taosMemoryRealloc #define tdbOsFree taosMemoryFree -#else - -#define tdbOsMalloc malloc -#define tdbOsCalloc calloc -#define tdbOsRealloc realloc -#define tdbOsFree free - -#endif - // For file and directory ----------------- -#ifdef TDB_FOR_TDENGINE - /* file */ typedef TdFilePtr tdb_fd_t; -#define TDB_O_CREAT TD_FILE_CTEATE +#define TDB_O_CREAT TD_FILE_CTEATE #define TDB_O_WRITE TD_FILE_WRITE #define TDB_O_READ TD_FILE_READ #define TDB_O_TRUNC TD_FILE_TRUNC @@ -67,12 +56,37 @@ typedef TdFilePtr tdb_fd_t; #define tdbOsMkdir taosMkDir #define tdbOsRmdir taosRemoveDir +// For threads and lock ----------------- +/* spin lock */ +typedef TdThreadSpinlock tdb_spinlock_t; + +#define tdbSpinlockInit taosThreadSpinInit +#define tdbSpinlockDestroy taosThreadSpinDestroy +#define tdbSpinlockLock taosThreadSpinLock +#define tdbSpinlockUnlock taosThreadSpinUnlock +#define tdbSpinlockTrylock pthread_spin_trylock + +/* mutex lock */ +typedef TdThreadMutex tdb_mutex_t; + +#define tdbMutexInit taosThreadMutexInit +#define tdbMutexDestroy taosThreadMutexDestroy +#define tdbMutexLock taosThreadMutexLock +#define tdbMutexUnlock taosThreadMutexUnlock + #else +// For memory ----------------- +#define tdbOsMalloc malloc +#define tdbOsCalloc calloc +#define tdbOsRealloc realloc +#define tdbOsFree free + +// For file and directory ----------------- /* file */ typedef int tdb_fd_t; -#define TDB_O_CREAT O_CREAT +#define TDB_O_CREAT O_CREAT #define TDB_O_WRITE O_WRONLY #define TDB_O_READ O_RDONLY #define TDB_O_TRUNC O_TRUNC @@ -95,30 +109,7 @@ i64 tdbOsWrite(tdb_fd_t fd, const void *pData, i64 nBytes); #define tdbOsMkdir mkdir #define tdbOsRmdir rmdir -#endif - // For threads and lock ----------------- -#ifdef TDB_FOR_TDENGINE - -/* spin lock */ -typedef TdThreadSpinlock tdb_spinlock_t; - -#define tdbSpinlockInit taosThreadSpinInit -#define tdbSpinlockDestroy taosThreadSpinDestroy -#define tdbSpinlockLock taosThreadSpinLock -#define tdbSpinlockUnlock taosThreadSpinUnlock -#define tdbSpinlockTrylock pthread_spin_trylock - -/* mutex lock */ -typedef TdThreadMutex tdb_mutex_t; - -#define tdbMutexInit taosThreadMutexInit -#define tdbMutexDestroy taosThreadMutexDestroy -#define tdbMutexLock taosThreadMutexLock -#define tdbMutexUnlock taosThreadMutexUnlock - -#else - /* spin lock */ typedef pthread_spinlock_t tdb_spinlock_t; From 27a567543b903bae3369dfe263fd1d7e2086e9f5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 28 Mar 2022 08:44:04 +0000 Subject: [PATCH 5/5] fix mutex problem --- source/libs/tdb/src/db/tdbPCache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index 981dd63593..07c267a15c 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -122,7 +122,7 @@ static void tdbPCacheClearLock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mute static void tdbPCacheLock(SPCache *pCache) { tdbMutexLock(&(pCache->mutex)); } -static void tdbPCacheUnlock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mutex)); } +static void tdbPCacheUnlock(SPCache *pCache) { tdbMutexUnlock(&(pCache->mutex)); } static bool tdbPCacheLocked(SPCache *pCache) { assert(0);