diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 754617f99f..12e806c87a 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -43,6 +43,7 @@ def pre_test(){ cd ${WKC} git reset --hard git clean -fxd + rm -rf examples/rust/ git remote prune origin git fetch ''' diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index dd4fbc8d2d..2fcf4dd62c 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -98,10 +98,9 @@ int32_t create_stream() { /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ - pRes = - taos_query(pConn, - "create stream stream1 trigger max_delay 10s into outstb as select _wstart, sum(k) from st1 partition " - "by tbname session(ts, 10s) "); + pRes = taos_query(pConn, + "create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, " + "count(k) from st1 partition by tbname interval(20s) "); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 26d65db8f4..14efec8757 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -60,6 +60,7 @@ enum { STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, + STREAM_INPUT__DESTROY, }; typedef enum EStreamType { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f17fdb881d..907010044b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2663,6 +2663,10 @@ typedef struct { SEpSet epSet; } SVgEpSet; +typedef struct { + int32_t padding; +} SRSmaExecMsg; + typedef struct { int64_t suid; int8_t level; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index b16df0e885..16d5965759 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -202,6 +202,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e6fcb021d5..384c6a289f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -53,6 +53,7 @@ enum { TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE, TASK_SCHED_STATUS__FAILED, + TASK_SCHED_STATUS__DROPPING, }; enum { @@ -127,6 +128,10 @@ typedef struct { int8_t type; } SStreamCheckpoint; +typedef struct { + int8_t type; +} SStreamTaskDestroy; + typedef struct { int8_t type; SSDataBlock* pBlock; @@ -211,7 +216,6 @@ typedef struct { void* vnode; FTbSink* tbSinkFunc; STSchema* pTSchema; - SHashObj* pHash; // groupId to tbuid } STaskSinkTb; typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 952066df46..790cbf906d 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -34,6 +34,7 @@ extern bool gRaftDetailLog; #define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) #define SYNC_MAX_RECV_TIME_RANGE_MS 1000 +#define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/include/util/taoserror.h b/include/util/taoserror.h index d7ec3697af..12d6127165 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -291,6 +291,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1) #define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2) #define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3) +#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) @@ -614,6 +615,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154) #define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) +#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 8eb3ed3901..e610b41a04 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -338,6 +338,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 455da6a40e..8cff7fe48e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -636,6 +636,7 @@ typedef struct { int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj); +void tFreeStreamObj(SStreamObj* pObj); typedef struct { char streamName[TSDB_STREAM_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndStb.h b/source/dnode/mnode/impl/inc/mndStb.h index 44a7fdadde..010199a89f 100644 --- a/source/dnode/mnode/impl/inc/mndStb.h +++ b/source/dnode/mnode/impl/inc/mndStb.h @@ -34,6 +34,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate); SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName); int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb); int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); +void mndFreeStb(SStbObj *pStb); void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst); void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 08ce161409..e6f1a40993 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -116,6 +116,25 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { return 0; } +void tFreeStreamObj(SStreamObj *pStream) { + taosMemoryFree(pStream->sql); + taosMemoryFree(pStream->ast); + taosMemoryFree(pStream->physicalPlan); + if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema); + + int32_t sz = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < sz; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + int32_t taskSz = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < taskSz; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + tFreeSStreamTask(pTask); + } + taosArrayDestroy(pLevel); + } + taosArrayDestroy(pStream->tasks); +} + SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); if (pVgEpNew == NULL) return NULL; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a24b7ef459..3bfd7eb596 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -424,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } mndAddTaskToTaskSet(taskSourceLevel, pTask); + pTask->triggerParam = 0; + // source pTask->taskLevel = TASK_LEVEL__SOURCE; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index dd2b595c29..e0f2b83160 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -266,6 +266,15 @@ _OVER: return pRow; } +void mndFreeStb(SStbObj *pStb) { + taosArrayDestroy(pStb->pFuncs); + taosMemoryFreeClear(pStb->pColumns); + taosMemoryFreeClear(pStb->pTags); + taosMemoryFreeClear(pStb->comment); + taosMemoryFreeClear(pStb->pAst1); + taosMemoryFreeClear(pStb->pAst2); +} + static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb); return 0; @@ -273,12 +282,7 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb); - taosArrayDestroy(pStb->pFuncs); - taosMemoryFreeClear(pStb->pColumns); - taosMemoryFreeClear(pStb->pTags); - taosMemoryFreeClear(pStb->comment); - taosMemoryFreeClear(pStb->pAst1); - taosMemoryFreeClear(pStb->pAst2); + mndFreeStb(pStb); return 0; } @@ -2021,8 +2025,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, FOREACH(pNode, pNodeList) { SColumnNode *pCol = (SColumnNode *)pNode; - if (pCol->tableId != suid) { - mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); + if (pCol->tableId == suid) { sdbRelease(pSdb, pTopic); nodesDestroyNode(pAst); return -1; @@ -2045,6 +2048,16 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; + if (pStream->smaId != 0) { + sdbRelease(pSdb, pStream); + continue; + } + + if (pStream->targetStbUid == suid) { + sdbRelease(pSdb, pStream); + return -1; + } + SNode *pAst = NULL; if (nodesStringToNode(pStream->ast, &pAst) != 0) { ASSERT(0); @@ -2057,8 +2070,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, FOREACH(pNode, pNodeList) { SColumnNode *pCol = (SColumnNode *)pNode; - if (pCol->tableId != suid) { - mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId); + if (pCol->tableId == suid) { sdbRelease(pSdb, pStream); nodesDestroyNode(pAst); return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8c453e0c88..6dc8e2072b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -167,6 +167,9 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) { static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) { mTrace("stream:%s, perform delete action", pStream->name); + taosWLockLatch(&pStream->lock); + tFreeStreamObj(pStream); + taosWUnLockLatch(&pStream->lock); return 0; } @@ -493,10 +496,17 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre stbObj.uid = pStream->targetStbUid; - if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; + if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) { + mndFreeStb(&stbObj); + goto _OVER; + } + + tFreeSMCreateStbReq(&createReq); + mndFreeStb(&stbObj); return 0; _OVER: + tFreeSMCreateStbReq(&createReq); mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); return -1; @@ -715,6 +725,7 @@ _OVER: mndReleaseDb(pMnode, pDb); tFreeSCMCreateStreamReq(&createStreamReq); + tFreeStreamObj(&streamObj); return code; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 560ff97ae6..9686624341 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -63,6 +63,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list); +int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray* list); void *vnodeGetIdx(SVnode *pVnode); void *vnodeGetIvtIdx(SVnode *pVnode); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 944d7759b2..c43772062e 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -57,9 +57,10 @@ typedef struct { void *tmrHandle; // shared by all fetch tasks } SSmaMgmt; -#define SMA_ENV_LOCK(env) (&(env)->lock) -#define SMA_ENV_TYPE(env) ((env)->type) -#define SMA_ENV_STAT(env) ((env)->pStat) +#define SMA_ENV_LOCK(env) (&(env)->lock) +#define SMA_ENV_TYPE(env) ((env)->type) +#define SMA_ENV_STAT(env) ((env)->pStat) +#define SMA_RSMA_STAT(sma) ((SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)(sma)->pRSmaEnv)) struct STSmaStat { int8_t state; // ETsdbSmaStat @@ -86,15 +87,17 @@ struct SQTaskFWriter { }; struct SRSmaStat { - SSma *pSma; - int64_t commitAppliedVer; // vnode applied version for async commit - int64_t refId; // shared by fetch tasks - SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) - int8_t triggerStat; // shared by fetch tasks - int8_t commitStat; // 0 not in committing, 1 in committing - SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) - SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; - SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash + SSma *pSma; + int64_t commitAppliedVer; // vnode applied version for async commit + int64_t refId; // shared by fetch tasks + volatile int64_t qBufSize; // queue buffer size + SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) + int8_t triggerStat; // shared by fetch tasks + int8_t commitStat; // 0 not in committing, 1 in committing + int8_t execStat; // 0 not in exec , 1 in exec + SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) + SHashObj *infoHash; // key: suid, value: SRSmaInfo + SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2 }; struct SSmaStat { @@ -105,34 +108,40 @@ struct SSmaStat { T_REF_DECLARE() }; -#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) -#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) -#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) -#define RSMA_IMU_INFO_HASH(r) ((r)->iRsmaInfoHash) -#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) -#define RSMA_COMMIT_STAT(r) (&(r)->commitStat) -#define RSMA_REF_ID(r) ((r)->refId) -#define RSMA_FS_LOCK(r) (&(r)->lock) +#define SMA_STAT_TSMA(s) (&(s)->tsmaStat) +#define SMA_STAT_RSMA(s) (&(s)->rsmaStat) +#define RSMA_INFO_HASH(r) ((r)->infoHash) +#define RSMA_FETCH_HASH(r) ((r)->fetchHash) +#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) +#define RSMA_COMMIT_STAT(r) (&(r)->commitStat) +#define RSMA_REF_ID(r) ((r)->refId) +#define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { - int8_t level; - int8_t triggerStat; - int32_t maxDelay; - tmr_h tmrId; + int8_t level; + int8_t triggerStat; + uint16_t interval; // second + int32_t maxDelay; + tmr_h tmrId; }; struct SRSmaInfo { STSchema *pTSchema; int64_t suid; int64_t refId; // refId of SRSmaStat - int8_t delFlag; + uint64_t delFlag : 1; + uint64_t lastReceived : 63; // second T_REF_DECLARE() SRSmaInfoItem items[TSDB_RETENTION_L2]; void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t - void *iTaskInfo[TSDB_RETENTION_L2]; // immutable + STaosQueue *queue; // buffer queue of SubmitReq + STaosQall *qall; // buffer qall of SubmitReq + void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t + STaosQueue *iQueue; // immutable buffer queue of SubmitReq + STaosQall *iQall; // immutable buffer qall of SubmitReq }; -#define RSMA_INFO_HEAD_LEN 32 +#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items) #define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) #define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) #define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i]) @@ -161,6 +170,12 @@ enum { RSMA_RESTORE_SYNC = 2, }; +typedef enum { + RSMA_EXEC_OVERFLOW = 1, // triggered by queue buf overflow + RSMA_EXEC_TIMEOUT = 2, // triggered by timer + RSMA_EXEC_COMMIT = 3, // triggered by commit +} ERsmaExecType; + void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); @@ -228,12 +243,13 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); -int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc); +int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); +int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 5164e22474..898e79928b 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -65,6 +65,7 @@ struct SVBufPool { SVBufPool* next; SVnode* pVnode; volatile int32_t nRef; + TdThreadSpinlock lock; int64_t size; uint8_t* ptr; SVBufPoolNode* pTail; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0c7a08a2b5..7ac1cc4f0e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -199,6 +199,7 @@ int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t smaProcessFetch(SSma* pSma, void* pMsg); +int32_t smaProcessExec(SSma* pSma, void* pMsg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 373cfdfb47..8b92475035 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -83,8 +83,7 @@ int32_t smaBegin(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); int8_t rsmaTriggerStat = atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE); @@ -123,7 +122,7 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { } SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); // step 1: set rsma stat paused atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); @@ -289,8 +288,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(SMA_ENV_STAT(pSmaEnv)); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); // cleanup outdated qtaskinfo files tdCleanupQTaskInfoFiles(pSma, pRSmaStat); @@ -299,10 +297,9 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { } /** - * @brief Rsma async commit implementation + * @brief Rsma async commit implementation(only do some necessary light weighted task) * 1) set rsma stat TASK_TRIGGER_STAT_PAUSED * 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) - * 3) * * @param pSma * @return int32_t @@ -314,7 +311,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } SSmaStat *pStat = SMA_ENV_STAT(pEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); @@ -336,28 +333,55 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } - // step 3: swap rsmaInfoHash and iRsmaInfoHash + /** + * @brief step 3: consume the SubmitReq in buffer + * 1) This is high cost task and should not put in asyncPreCommit originally. + * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. + */ + nLoops = 0; + smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1); + if (old == 0) break; + if (++nLoops > 1000) { + sched_yield(); + nLoops = 0; + smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + } + } + + smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + + if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) { + atomic_store_8(&pRSmaStat->execStat, 0); + return TSDB_CODE_FAILED; + } + + // step 4: swap queue/qall and iQueue/iQall // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(RSMA_INFO_HASH(pRSmaStat)); - ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat)); - RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat); - RSMA_INFO_HASH(pRSmaStat) = - taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); - if (!RSMA_INFO_HASH(pRSmaStat)) { - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr()); - return TSDB_CODE_FAILED; + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + TSWAP(pInfo->iQall, pInfo->qall); + TSWAP(pInfo->iQueue, pInfo->queue); + TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]); + TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]); + pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); } + atomic_store_64(&pRSmaStat->qBufSize, 0); + atomic_store_8(&pRSmaStat->execStat, 0); // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - // step 4: others + // step 5: others pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; return TSDB_CODE_SUCCESS; @@ -375,17 +399,18 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); - // perform persist task for qTaskInfo - tdRSmaPersistExecImpl(pRSmaStat, RSMA_IMU_INFO_HASH(pRSmaStat)); + // perform persist task for qTaskInfo operator + if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { + return TSDB_CODE_FAILED; + } return TSDB_CODE_SUCCESS; } /** - * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty. + * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty. * * @param pSma * @return int32_t @@ -396,65 +421,66 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SArray *rsmaDeleted = NULL; - // step 1: merge rsmaInfoHash and iRsmaInfoHash + // step 1: merge qTaskInfo and iQTaskInfo // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); -#if 0 - if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { - // just switch the hash pointer if rsmaInfoHash is empty - if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) { - SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat); - RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat); - RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash; - } - } else { -#endif -#if 1 - void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); + + void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); while (pIter) { - tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - - if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - int32_t refVal = T_REF_VAL_GET(pRSmaInfo); - if (refVal == 0) { - tdFreeRSmaInfo(pSma, pRSmaInfo, true); - smaDebug( - "vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for " - "table:%" PRIi64, - SMA_VID(pSma), *pSuid); - } else { - smaDebug( - "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " - "table:%" PRIi64, - SMA_VID(pSma), refVal, *pSuid); + tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + int32_t refVal = T_REF_VAL_GET(pRSmaInfo); + if (refVal == 0) { + if (!rsmaDeleted) { + if ((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))) { + taosArrayPush(rsmaDeleted, pSuid); + } } - - pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); - continue; + } else { + smaDebug( + "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " + "table:%" PRIi64, + SMA_VID(pSma), refVal, *pSuid); } - taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); - smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), - *pSuid); - } else { - // free the resources - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; - tdFreeRSmaInfo(pSma, pRSmaInfo, false); - smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma), - *pSuid); + + pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); + continue; } - pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); - } -#endif - // } + if (pRSmaInfo->taskInfo[0]) { + if (pRSmaInfo->iTaskInfo[0]) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0]; + tdFreeRSmaInfo(pSma, pRSmaInfo, true); + pRSmaInfo->iTaskInfo[0] = NULL; + } + } else { + TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]); + } - taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); - RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; + taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); + smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); + + pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); + } + + for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) { + tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i); + void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); + if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + tdFreeRSmaInfo(pSma, pRSmaInfo, true); + smaDebug( + "vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for " + "table:%" PRIi64, + SMA_VID(pSma), *pSuid); + } + taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); + } + taosArrayDestroy(rsmaDeleted); + // TODO: remove suid in files? // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index ccb6ad3a72..f51aad22bd 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -171,7 +171,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { if (!pRSmaInfo) return 0; - + int ref = T_REF_INC(pRSmaInfo); smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); return 0; @@ -228,7 +228,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS RSMA_INFO_HASH(pRSmaStat) = taosHashInit( RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (!RSMA_INFO_HASH(pRSmaStat)) { - taosMemoryFreeClear(*pSmaStat); + return TSDB_CODE_FAILED; + } + + RSMA_FETCH_HASH(pRSmaStat) = taosHashInit( + RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + if (!RSMA_FETCH_HASH(pRSmaStat)) { return TSDB_CODE_FAILED; } } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { @@ -264,8 +269,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); // step 2: destroy the rsma info and associated fetch tasks - // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. -#if 1 if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) { void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL); while (infoHash) { @@ -274,10 +277,12 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash); } } -#endif taosHashCleanup(RSMA_INFO_HASH(pStat)); - // step 3: wait all triggered fetch tasks finished + // step 3: destroy the rsma fetch hash + taosHashCleanup(RSMA_FETCH_HASH(pStat)); + + // step 4: wait all triggered fetch tasks finished int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { @@ -293,7 +298,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } } - // step 4: free pStat + // step 5: free pStat taosMemoryFreeClear(pStat); } } @@ -318,9 +323,9 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (pSmaStat) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { - tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat)); + tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat)); } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat); + SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat; int32_t vid = SMA_VID(pRSmaStat->pSma); int64_t refId = RSMA_REF_ID(pRSmaStat); if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index b7a2efd489..9b3b0cb63d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,8 +15,10 @@ #include "sma.h" -#define RSMA_QTASKINFO_BUFSIZE 32768 +#define RSMA_QTASKINFO_BUFSIZE (32768) #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid +#define RSMA_QTASKEXEC_BUFSIZE (1048576) +#define RSMA_SUBMIT_BATCH_SIZE (1024) SSmaMgmt smaMgmt = { .inited = 0, @@ -27,17 +29,20 @@ SSmaMgmt smaMgmt = { #define TD_RSMAINFO_DEL_FILE "rsmainfo.del" typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; +typedef struct SRSmaExecQItem SRSmaExecQItem; static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx); -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, - int8_t level); +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, + ERsmaExecType type, int8_t level); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); +static void tdFreeRSmaSubmitItems(SArray *pItems); +static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr); static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid, int8_t blkType); + int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); @@ -76,6 +81,11 @@ struct SRSmaQTaskInfoIter { int32_t nBufPos; }; +struct SRSmaExecQItem { + void *pRSmaInfo; + void *qall; +}; + void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) { tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); } @@ -139,6 +149,18 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (isDeepFree) { taosMemoryFreeClear(pInfo->pTSchema); } + + if (isDeepFree) { + if (pInfo->queue) taosCloseQueue(pInfo->queue); + if (pInfo->qall) taosFreeQall(pInfo->qall); + if (pInfo->iQueue) taosCloseQueue(pInfo->iQueue); + if (pInfo->iQall) taosFreeQall(pInfo->iQall); + pInfo->queue = NULL; + pInfo->qall = NULL; + pInfo->iQueue = NULL; + pInfo->iQall = NULL; + } + taosMemoryFree(pInfo); } @@ -179,7 +201,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pRSmaInfo->taskInfo[i]) { - if ((qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true) < 0)) { + if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true)) < 0)) { tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, terrstr()); @@ -351,6 +373,19 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con goto _err; } pRSmaInfo->pTSchema = pTSchema; + if (!(pRSmaInfo->queue = taosOpenQueue())) { + goto _err; + } + + if (!(pRSmaInfo->qall = taosAllocateQall())) { + goto _err; + } + if (!(pRSmaInfo->iQueue = taosOpenQueue())) { + goto _err; + } + if (!(pRSmaInfo->iQall = taosAllocateQall())) { + goto _err; + } pRSmaInfo->suid = suid; pRSmaInfo->refId = RSMA_REF_ID(pStat); T_REF_INIT_VAL(pRSmaInfo, 1); @@ -419,8 +454,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid); @@ -528,6 +562,14 @@ void *tdUidStoreFree(STbUidStore *pStore) { return NULL; } +/** + * @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue + * would be freed when close Vnode, thus lock should be used if with race condition. + * @param pTsdb + * @param version + * @param pReq + * @return int32_t + */ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { if (!pReq) { terrno = TSDB_CODE_INVALID_PTR; @@ -535,7 +577,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { } SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; - + // TODO: spin lock for race conditiond if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { return TSDB_CODE_FAILED; } @@ -569,17 +611,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { return 0; } -static void tdDestroySDataBlockArray(SArray *pArray) { - // TODO -#if 0 - for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SSDataBlock *pDataBlock = taosArrayGet(pArray, i); - blockDestroyInner(pDataBlock); - } -#endif - taosArrayDestroy(pArray); -} - /** * @brief retention of rsma1/rsma2 * @@ -605,7 +636,7 @@ _end: } static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid, int8_t blkType) { + int64_t suid) { SArray *pResList = taosArrayInit(1, POINTER_BYTES); if (pResList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -615,7 +646,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm while (1) { uint64_t ts; int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); - if (code < 0) { + if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { break; } else { @@ -637,8 +668,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm } else { smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } - -#if 1 +#if 0 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowDataBlocks(pResList, flag); @@ -662,10 +692,9 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm goto _err; } taosMemoryFreeClear(pReq); - + smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, SMA_VID(pSma), suid, pItem->level, output->info.version); - } } @@ -677,34 +706,113 @@ _err: return TSDB_CODE_FAILED; } -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, - int8_t level) { +/** + * @brief Copy msg to rsmaQueueBuffer for batch process + * + * @param pSma + * @param pMsg + * @param inputType + * @param pInfo + * @param suid + * @return int32_t + */ +static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, + tb_uid_t suid) { + const SSubmitReq *pReq = (const SSubmitReq *)pMsg; + + void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM); + if (!qItem) { + return TSDB_CODE_FAILED; + } + + memcpy(qItem, pMsg, pReq->header.contLen); + + taosWriteQitem(pInfo->queue, qItem); + + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen); + + // smoothing consume + int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZE; + if (n > 1) { + if (n > 10) { + n = 10; + } + taosMsleep(n << 4); + if (n > 2) { + smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), + taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); + } else { + smaDebug("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), + taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { + SSubmitMsgIter msgIter = {0}; + SSubmitBlkIter blkIter = {0}; + STSRow *row = NULL; + if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1; + while (true) { + SSubmitBlk *pBlock = NULL; + if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); + while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { + smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64, + SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts); + } + } + return 0; +} + +/** + * @brief sync mode + * + * @param pSma + * @param pMsg + * @param msgSize + * @param inputType + * @param pInfo + * @param type + * @param level + * @return int32_t + */ +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, + ERsmaExecType type, int8_t level) { int32_t idx = level - 1; - if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) { - smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); + + void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx); + if (!qTaskInfo) { + smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, + pInfo->suid); return TSDB_CODE_SUCCESS; } if (!pInfo->pTSchema) { - smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); + smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid); return TSDB_CODE_FAILED; } smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, - RSMA_INFO_QTASK(pInfo, idx), suid); + RSMA_INFO_QTASK(pInfo, idx), pInfo->suid); - if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT +#if 0 + for (int32_t i = 0; i < msgSize; ++i) { + SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *)); + smaDebug("vgId:%d, [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version); + tdRsmaPrintSubmitReq(pSma, pReq); + } +#endif + if (qSetMultiStreamInput(qTaskInfo, pMsg, msgSize, inputType) < 0) { smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, - STREAM_INPUT__DATA_SUBMIT); - atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); - - if (smaMgmt.tmrHandle) { - taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - } + tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); return TSDB_CODE_SUCCESS; } @@ -739,51 +847,20 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } + if (!pRSmaInfo->taskInfo[0]) { + if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) { + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + return NULL; + } + } tdRefRSmaInfo(pSma, pRSmaInfo); taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + ASSERT(pRSmaInfo->suid == suid); return pRSmaInfo; } taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat - return NULL; - } - - // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat - SRSmaInfo *pCowRSmaInfo = NULL; - // lock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); - if (!(pCowRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)))) { // 2-phase lock - void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); - if (iRSmaInfo) { - SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo; - if (pIRSmaInfo && !RSMA_INFO_IS_DEL(pIRSmaInfo)) { - if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) { - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); - return NULL; - } - smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid); - if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) { - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); - return NULL; - } - } - } - } else { - pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo; - ASSERT(!pCowRSmaInfo); - } - - if (pCowRSmaInfo) { - tdRefRSmaInfo(pSma, pCowRSmaInfo); - } - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - return pCowRSmaInfo; + return NULL; } static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { @@ -792,22 +869,90 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { } } -static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { +/** + * @brief async mode + * + * @param pSma + * @param pMsg + * @param inputType + * @param suid + * @return int32_t + */ +static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); if (!pRSmaInfo) { - smaError("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); + smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } if (inputType == STREAM_INPUT__DATA_SUBMIT) { - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1); - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2); + if (tdExecuteRSmaImplAsync(pSma, pMsg, inputType, pRSmaInfo, suid) < 0) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); + return TSDB_CODE_FAILED; + } + if (smaMgmt.tmrHandle) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0); + if (pItem->level > 0) { + atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); + } + pItem = RSMA_INFO_ITEM(pRSmaInfo, 1); + if (pItem->level > 0) { + atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); + } + } + } else { + ASSERT(0); } tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_SUCCESS; } +static int32_t tdRSmaExecCheck(SSma *pSma) { + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + int64_t bufSize = atomic_load_64(&pRSmaStat->qBufSize); + + if (bufSize < RSMA_QTASKEXEC_BUFSIZE) { + smaDebug("vgId:%d, bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize, + RSMA_QTASKEXEC_BUFSIZE); + return TSDB_CODE_SUCCESS; + } + + if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 1) { + smaDebug("vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma), + bufSize); + return TSDB_CODE_SUCCESS; + } + + smaDebug("vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize); + + SRSmaExecMsg fetchMsg; + int32_t contLen = sizeof(SMsgHead); + void *pBuf = rpcMallocCont(0 + contLen); + + ((SMsgHead *)pBuf)->vgId = SMA_VID(pSma); + ((SMsgHead *)pBuf)->contLen = sizeof(SMsgHead); + + SRpcMsg rpcMsg = { + .code = 0, + .msgType = TDMT_VND_EXEC_RSMA, + .pCont = pBuf, + .contLen = contLen, + }; + + if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { + smaError("vgId:%d, failed to put rsma exec msg into query-queue since %s", SMA_VID(pSma), terrstr()); + goto _err; + } + + smaDebug("vgId:%d, success to put rsma fetch msg into query-queue", SMA_VID(pSma)); + + return TSDB_CODE_SUCCESS; +_err: + atomic_store_8(&pRSmaStat->execStat, 0); + return TSDB_CODE_FAILED; +} + int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); if (!pEnv) { @@ -826,45 +971,62 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { tdFetchSubmitReqSuids(pMsg, &uidStore); if (uidStore.suid != 0) { - tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid); + tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid); void *pIter = taosHashIterate(uidStore.uidHash, NULL); while (pIter) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid); + tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid); pIter = taosHashIterate(uidStore.uidHash, pIter); } tdUidStoreDestory(&uidStore); + + tdRSmaExecCheck(pSma); } } return TSDB_CODE_SUCCESS; } +/** + * @brief retrieve rsma meta and init + * + * @param pSma + * @param nTables number of tables of rsma + * @return int32_t + */ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { - SVnode *pVnode = pSma->pVnode; + SVnode *pVnode = pSma->pVnode; + SArray *suidList = NULL; + STbUidStore uidStore = {0}; + SMetaReader mr = {0}; - SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); - if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { - taosArrayDestroy(suidList); + if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (vnodeGetStbIdList(pSma->pVnode, 0, suidList) < 0) { smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr()); - return TSDB_CODE_FAILED; + goto _err; } int64_t arrSize = taosArrayGetSize(suidList); - if (nTables) { - *nTables = arrSize; - } - if (arrSize == 0) { + if (nTables) { + *nTables = 0; + } taosArrayDestroy(suidList); smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode)); return TSDB_CODE_SUCCESS; } - SMetaReader mr = {0}; + int64_t nRsmaTables = 0; metaReaderInit(&mr, SMA_META(pSma), 0); + if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) { + goto _err; + } for (int64_t i = 0; i < arrSize; ++i) { tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid); @@ -877,6 +1039,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { ASSERT(mr.me.type == TSDB_SUPER_TABLE); ASSERT(mr.me.uid == suid); if (TABLE_IS_ROLLUP(mr.me.flags)) { + ++nRsmaTables; SRSmaParam *param = &mr.me.stbEntry.rsmaParam; for (int i = 0; i < TSDB_RETENTION_L2; ++i) { smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64 @@ -887,17 +1050,40 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); goto _err; } + + // reload all ctbUids for suid + uidStore.suid = suid; + if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) { + smaError("vgId:%d, rsma restore, get ctb idlist failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, + terrstr()); + goto _err; + } + + if (tdUpdateTbUidList(pVnode->pSma, &uidStore) < 0) { + smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, + terrstr()); + goto _err; + } + + taosArrayClear(uidStore.tbUids); + smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid); } } metaReaderClear(&mr); taosArrayDestroy(suidList); + tdUidStoreDestory(&uidStore); + + if (nTables) { + *nTables = nRsmaTables; + } return TSDB_CODE_SUCCESS; _err: metaReaderClear(&mr); taosArrayDestroy(suidList); + tdUidStoreDestory(&uidStore); return TSDB_CODE_FAILED; } @@ -1230,7 +1416,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i); + qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i); if (!taskInfo) { smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); continue; @@ -1388,7 +1574,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { } _end: - // taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } @@ -1400,7 +1586,7 @@ _end: * @param level * @return int32_t */ -int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { +static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level}; int32_t ret = 0; int32_t contLen = 0; @@ -1427,7 +1613,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { .code = 0, .msgType = TDMT_VND_FETCH_RSMA, .pCont = pBuf, - .contLen = contLen, + .contLen = contLen + sizeof(SMsgHead), }; if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { @@ -1452,13 +1638,11 @@ _err: * @return int32_t */ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { - SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; - SRSmaFetchMsg req = {0}; - SDecoder decoder = {0}; - void *pBuf = NULL; - SRSmaInfo *pInfo = NULL; - SRSmaInfoItem *pItem = NULL; - + SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; + SRSmaFetchMsg req = {0}; + SDecoder decoder = {0}; + void *pBuf = NULL; + SRSmaStat *pRSmaStat = NULL; if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; goto _err; @@ -1472,37 +1656,265 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { goto _err; } - pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid); - if (!pInfo) { - if (terrno == TSDB_CODE_SUCCESS) { - terrno = TSDB_CODE_RSMA_EMPTY_INFO; + pRSmaStat = SMA_RSMA_STAT(pSma); + + if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 0) { + SArray *pSubmitArr = NULL; + if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + atomic_store_8(&pRSmaStat->execStat, 0); + goto _err; } - smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), - req.suid, req.level, terrstr()); - goto _err; + tdRSmaConsumeAndFetch(pSma, req.suid, req.level, pSubmitArr); + atomic_store_8(&pRSmaStat->execStat, 0); + taosArrayDestroy(pSubmitArr); + } else { + int8_t level = req.level; + int8_t *val = taosHashGet(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid)); + if (val) { + level |= (*val); + } + ASSERT(level >= 1 && level <= 3); + taosHashPut(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid), &level, sizeof(level)); } - pItem = RSMA_INFO_ITEM(pInfo, req.level - 1); - - SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; - qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, req.level - 1); - if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { - goto _err; - } - if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, STREAM_INPUT__DATA_BLOCK) < 0) { - goto _err; - } - - tdCleanupStreamInputDataBlock(taskInfo); - - tdReleaseRSmaInfo(pSma, pInfo); tDecoderClear(&decoder); smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid, req.level); return TSDB_CODE_SUCCESS; _err: - tdReleaseRSmaInfo(pSma, pInfo); tDecoderClear(&decoder); smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; } + +static void tdFreeRSmaSubmitItems(SArray *pItems) { + for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { + taosFreeQitem(*(void **)taosArrayGet(pItems, i)); + } +} + +static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr) { + SRSmaInfo *pInfo = tdAcquireRSmaInfoBySuid(pSma, suid); + if (!pInfo) { + return TSDB_CODE_SUCCESS; + } + + // step 1: consume submit req + int64_t qMemSize = 0; + if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) { + taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock + + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + atomic_fetch_sub_64(&pRSmaStat->qBufSize, qMemSize); + + taosArrayClear(pSubmitArr); + + while (1) { + void *msg = NULL; + taosGetQitem(pInfo->qall, (void **)&msg); + if (msg) { + if (taosArrayPush(pSubmitArr, &msg) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } else { + break; + } + } + + int32_t size = taosArrayGetSize(pSubmitArr); + if (size > 0) { + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, RSMA_EXEC_TIMEOUT, i) < + 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } + + tdFreeRSmaSubmitItems(pSubmitArr); + } + } + + // step 2: fetch rsma result + SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; + for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (level & i) { + qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1); + if (!taskInfo) { + continue; + } + if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { + goto _err; + } + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); + if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, suid) < 0) { + tdCleanupStreamInputDataBlock(taskInfo); + goto _err; + } + + tdCleanupStreamInputDataBlock(taskInfo); + } + } + +_end: + tdReleaseRSmaInfo(pSma, pInfo); + return TSDB_CODE_SUCCESS; +_err: + tdReleaseRSmaInfo(pSma, pInfo); + return TSDB_CODE_FAILED; +} + +/** + * @brief + * + * @param pSma + * @param type + * @return int32_t + */ +int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SHashObj *infoHash = NULL; + SArray *pSubmitQArr = NULL; + SArray *pSubmitArr = NULL; + bool isFetchAll = false; + + if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) { + terrno = TSDB_CODE_RSMA_INVALID_STAT; + goto _err; + } + + if (type == RSMA_EXEC_OVERFLOW) { + taosRLockLatch(SMA_ENV_LOCK(pEnv)); + if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZE) { + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + return TSDB_CODE_SUCCESS; + } + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + } + + if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + // step 1: rsma exec - consume data in buffer queue for all suids + SRSmaExecQItem qItem = {0}; + void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock + if (type == RSMA_EXEC_OVERFLOW) { + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + if (taosQueueItemSize(pInfo->queue)) { + taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock + qItem.qall = &pInfo->qall; + qItem.pRSmaInfo = pIter; + taosArrayPush(pSubmitQArr, &qItem); + } + ASSERT(taosQueueItemSize(pInfo->queue) == 0); + pIter = taosHashIterate(infoHash, pIter); + } + } else if (type == RSMA_EXEC_COMMIT) { + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + if (taosQueueItemSize(pInfo->iQueue)) { + taosReadAllQitems(pInfo->iQueue, pInfo->iQall); + qItem.qall = &pInfo->iQall; + qItem.pRSmaInfo = pIter; + taosArrayPush(pSubmitQArr, &qItem); + } + ASSERT(taosQueueItemSize(pInfo->iQueue) == 0); + pIter = taosHashIterate(infoHash, pIter); + } + } else { + ASSERT(0); + } + atomic_store_64(&pRSmaStat->qBufSize, 0); + + int32_t qSize = taosArrayGetSize(pSubmitQArr); + for (int32_t i = 0; i < qSize; ++i) { + SRSmaExecQItem *pItem = taosArrayGet(pSubmitQArr, i); + while (1) { + void *msg = NULL; + taosGetQitem(*(STaosQall **)pItem->qall, (void **)&msg); + if (msg) { + if (taosArrayPush(pSubmitArr, &msg) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } else { + break; + } + } + + int32_t size = taosArrayGetSize(pSubmitArr); + if (size > 0) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } + tdFreeRSmaSubmitItems(pSubmitArr); + taosArrayClear(pSubmitArr); + } + } + + // step 2: rsma fetch - consume data in buffer queue for suids triggered by timer + if (taosHashGetSize(RSMA_FETCH_HASH(pRSmaStat)) <= 0) { + goto _end; + } + pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), NULL); + if (pIter) { + tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); + while ((pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), pIter))) { + tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); + } + } + +_end: + taosArrayDestroy(pSubmitArr); + taosArrayDestroy(pSubmitQArr); + return TSDB_CODE_SUCCESS; +_err: + taosArrayDestroy(pSubmitArr); + taosArrayDestroy(pSubmitQArr); + return TSDB_CODE_FAILED; +} + +/** + * @brief exec rsma level 1data, fetch result of level 2/3 and submit + * + * @param pSma + * @param pMsg + * @return int32_t + */ +int32_t smaProcessExec(SSma *pSma, void *pMsg) { + SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + + if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { + terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; + goto _err; + } + smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) { + goto _err; + } + + atomic_store_8(&pRSmaStat->execStat, 0); + smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + return TSDB_CODE_SUCCESS; +_err: + atomic_store_8(&pRSmaStat->execStat, 0); + smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(), + terrstr()); + return TSDB_CODE_FAILED; +} diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index fbcd2af751..335c15a539 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -139,7 +139,6 @@ static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppBuf) smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size); - SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf); pHdr->type = SNAP_DATA_QTASK; pHdr->size = size; @@ -279,7 +278,8 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (!qTaskF) { code = TAOS_SYSTEM_ERROR(errno); - smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName, tstrerror(code)); + smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName, + tstrerror(code)); goto _err; } qWriter->pWriteH = qTaskF; @@ -309,7 +309,7 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { if (rollback) { // TODO: rsma1/rsma2 // qtaskinfo - if(pWriter->pQTaskFWriter) { + if (pWriter->pQTaskFWriter) { taosRemoveFile(pWriter->pQTaskFWriter->fname); } } else { diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index b09d7e3c23..1687cd46a0 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -175,7 +175,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { } tdRefSmaStat(pSma, pStat); - pTsmaStat = SMA_TSMA_STAT(pStat); + pTsmaStat = SMA_STAT_TSMA(pStat); if (!pTsmaStat->pTSma) { STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index d9f38ffd09..da70222485 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -350,49 +350,45 @@ _err: } /** - * @brief pTSchema is shared + * @brief Clone qTaskInfo of SRSmaInfo * * @param pSma - * @param pDest - * @param pSrc + * @param pInfo * @return int32_t */ -int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) { - SVnode *pVnode = pSma->pVnode; +int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { SRSmaParam *param = NULL; - if (!pSrc) { - *pDest = NULL; + if (!pInfo) { return TSDB_CODE_SUCCESS; } SMetaReader mr = {0}; metaReaderInit(&mr, SMA_META(pSma), 0); - smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid); - if (metaGetTableEntryByUid(&mr, pSrc->suid) < 0) { - smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, + smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid); + if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) { + smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr()); goto _err; } ASSERT(mr.me.type == TSDB_SUPER_TABLE); - ASSERT(mr.me.uid == pSrc->suid); + ASSERT(mr.me.uid == pInfo->suid); if (TABLE_IS_ROLLUP(mr.me.flags)) { param = &mr.me.stbEntry.rsmaParam; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) { + if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) { goto _err; } } - smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid); + smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid); + } else { + terrno = TSDB_CODE_RSMA_INVALID_SCHEMA; + goto _err; } metaReaderClear(&mr); - - *pDest = pSrc; // pointer copy - return TSDB_CODE_SUCCESS; _err: - *pDest = NULL; metaReaderClear(&mr); - smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr()); + smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr()); return TSDB_CODE_FAILED; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1458dd01f2..366f1121e5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -569,8 +569,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { - int32_t code = 0; - if (pTask->taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); } @@ -581,8 +579,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { pTask->outputQueue = streamQueueOpen(); if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { - code = -1; - goto FAIL; + return -1; } pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; @@ -627,14 +624,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { streamSetupTrigger(pTask); - tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, + tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, pTask->selfChildId); - -FAIL: - if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); - if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); - // TODO free executor - return code; + return 0; } int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 55630511bf..522bf46aa1 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -231,34 +231,35 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(pTask->tbSink.pTSchema); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, - pTask->tbSink.stbFullName, &deleteReq); + SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, + pTask->tbSink.stbFullName, &deleteReq); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); - int32_t code; - int32_t len; - tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - // - ASSERT(0); - } - SEncoder encoder; - void* buf = rpcMallocCont(len + sizeof(SMsgHead)); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, abuf, len); - tEncodeSBatchDeleteReq(&encoder, &deleteReq); - tEncoderClear(&encoder); - - ((SMsgHead*)buf)->vgId = pVnode->config.vgId; - if (taosArrayGetSize(deleteReq.deleteReqs) != 0) { + int32_t code; + int32_t len; + tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); + if (code < 0) { + // + ASSERT(0); + } + SEncoder encoder; + void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); + void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, len); + tEncodeSBatchDeleteReq(&encoder, &deleteReq); + tEncoderClear(&encoder); + + ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; + SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, - .pCont = buf, + .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(serializedDeleteReq); tqDebug("failed to put into write-queue since %s", terrstr()); } } @@ -268,11 +269,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { // build write msg SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, - .pCont = pReq, - .contLen = ntohl(pReq->length), + .pCont = submitReq, + .contLen = ntohl(submitReq->length), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(submitReq); tqDebug("failed to put into write-queue since %s", terrstr()); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 2f57bd7d3f..70b4ffb637 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2585,32 +2585,6 @@ void* tsdbGetIvtIdx(SMeta* pMeta) { uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } -/** - * @brief Get all suids since suid - * - * @param pMeta - * @param suid return all suids in one vnode if suid is 0 - * @param list - * @return int32_t - */ -int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { - SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid); - if (!pCur) { - return TSDB_CODE_FAILED; - } - - while (1) { - tb_uid_t id = metaStbCursorNext(pCur); - if (id == 0) { - break; - } - - taosArrayPush(list, &id); - } - - metaCloseStbCursor(pCur); - return TSDB_CODE_SUCCESS; -} // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 0623b3bd10..5a22114ab4 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -78,7 +78,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) { void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { SVBufPoolNode *pNode; void *p; - + taosThreadSpinLock(&pPool->lock); if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { // allocate from the anchor node p = pPool->ptr; @@ -89,6 +89,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pNode = taosMemoryMalloc(sizeof(*pNode) + size); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosThreadSpinUnlock(&pPool->lock); return NULL; } @@ -101,7 +102,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pPool->size = pPool->size + sizeof(*pNode) + size; } - + taosThreadSpinUnlock(&pPool->lock); return p; } @@ -129,6 +130,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) return -1; } + if (taosThreadSpinInit(&pPool->lock, 0) != 0) { + taosMemoryFree(pPool); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pPool->next = NULL; pPool->pVnode = pVnode; pPool->nRef = 0; @@ -145,6 +152,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) static int vnodeBufPoolDestroy(SVBufPool *pPool) { vnodeBufPoolReset(pPool); + taosThreadSpinDestroy(&pPool->lock); taosMemoryFree(pPool); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index c8dc07af0a..64f223b974 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -220,6 +220,10 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); + // preCommit + // smaSyncPreCommit(pVnode->pSma); + smaAsyncPreCommit(pVnode->pSma); + vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; @@ -237,10 +241,6 @@ int vnodeCommit(SVnode *pVnode) { } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); - // preCommit - // smaSyncPreCommit(pVnode->pSma); - smaAsyncPreCommit(pVnode->pSma); - // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { ASSERT(0); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index d55f1796ad..8d799e919d 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -424,6 +424,25 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) { return TSDB_CODE_SUCCESS; } +int32_t vnodeGetStbIdList(SVnode* pVnode, int64_t suid, SArray* list) { + SMStbCursor* pCur = metaOpenStbCursor(pVnode->pMeta, suid); + if (!pCur) { + return TSDB_CODE_FAILED; + } + + while (1) { + tb_uid_t id = metaStbCursorNext(pCur); + if (id == 0) { + break; + } + + taosArrayPush(list, &id); + } + + metaCloseStbCursor(pCur); + return TSDB_CODE_SUCCESS; +} + int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid); if (!pCur) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 54a5aacbd2..c73d2ccfd5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -303,6 +303,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_VND_FETCH_RSMA: return smaProcessFetch(pVnode->pSma, pMsg); + case TDMT_VND_EXEC_RSMA: + return smaProcessExec(pVnode->pSma, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -530,7 +532,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR } tqUpdateTbUidList(pVnode->pTq, tbUids, true); - tdUpdateTbUidList(pVnode->pSma, pStore); + if (tdUpdateTbUidList(pVnode->pSma, pStore) < 0) { + goto _exit; + } tdUidStoreFree(pStore); // prepare rsp diff --git a/source/libs/executor/inc/tsimplehash.h b/source/libs/executor/inc/tsimplehash.h index a56f8e8c04..4c5a80e2f1 100644 --- a/source/libs/executor/inc/tsimplehash.h +++ b/source/libs/executor/inc/tsimplehash.h @@ -17,7 +17,6 @@ #define TDENGINE_TSIMPLEHASH_H #include "tarray.h" -#include "tlockfree.h" #ifdef __cplusplus extern "C" { @@ -27,6 +26,10 @@ typedef uint32_t (*_hash_fn_t)(const char *, uint32_t); typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len); typedef void (*_hash_free_fn_t)(void *); +/** + * @brief single thread hash + * + */ typedef struct SSHashObj SSHashObj; /** @@ -36,7 +39,7 @@ typedef struct SSHashObj SSHashObj; * @param fn hash function to generate the hash value * @return */ -SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen); +SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn); /** * return the size of hash table @@ -48,22 +51,26 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj); int32_t tSimpleHashPrint(const SSHashObj *pHashObj); /** - * put element into hash table, if the element with the same key exists, update it - * @param pHashObj - * @param key - * @param data - * @return + * @brief put element into hash table, if the element with the same key exists, update it + * + * @param pHashObj + * @param key + * @param keyLen + * @param data + * @param dataLen + * @return int32_t */ -int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data); +int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen); /** * return the payload data with the specified key * * @param pHashObj * @param key + * @param keyLen * @return */ -void *tSimpleHashGet(SSHashObj *pHashObj, const void *key); +void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen); /** * remove item with the specified key @@ -71,7 +78,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key); * @param key * @param keyLen */ -int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key); +int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen); /** * Clear the hash table. @@ -98,7 +105,7 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj); * @param keyLen * @return */ -void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen); +void *tSimpleHashGetKey(void *data, size_t* keyLen); /** * Create the hash table iterator @@ -109,7 +116,18 @@ void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen); */ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter); +/** + * Create the hash table iterator + * + * @param pHashObj + * @param data + * @param key + * @param iter + * @return void* + */ +void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter); + #ifdef __cplusplus } #endif -#endif // TDENGINE_TSIMPLEHASH_H +#endif // TDENGINE_TSIMPLEHASH_H \ No newline at end of file diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 945a94eb36..1564478734 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -55,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayClear(pInfo->pBlockLists); if (type == STREAM_INPUT__MERGED_SUBMIT) { - ASSERT(numOfBlocks > 1); + // ASSERT(numOfBlocks > 1); for (int32_t i = 0; i < numOfBlocks; i++) { SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); taosArrayPush(pInfo->pBlockLists, &pReq); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6418f5305c..757ab09d1a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2154,7 +2154,7 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) { int32_t rows = pResBlock->info.rows; - + blockDataEnsureCapacity(pResBlock, rows + 1); // todo set the correct primary timestamp column // output the result diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index 7989ad2b5a..6b2edf0d5e 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -14,8 +14,8 @@ */ #include "tsimplehash.h" -#include "os.h" #include "taoserror.h" +#include "tlog.h" #define SHASH_DEFAULT_LOAD_FACTOR 0.75 #define HASH_MAX_CAPACITY (1024 * 1024 * 16) @@ -31,19 +31,21 @@ taosMemoryFreeClear(_n); \ } while (0); +#pragma pack(push, 4) typedef struct SHNode { struct SHNode *next; + uint32_t keyLen : 20; + uint32_t dataLen : 12; char data[]; } SHNode; +#pragma pack(pop) struct SSHashObj { SHNode **hashList; size_t capacity; // number of slots - int64_t size; // number of elements in hash table - _hash_fn_t hashFp; // hash function - _equal_fn_t equalFp; // equal function - int32_t keyLen; - int32_t dataLen; + int64_t size; // number of elements in hash table + _hash_fn_t hashFp; // hash function + _equal_fn_t equalFp; // equal function }; static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { @@ -54,7 +56,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { return i; } -SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen) { +SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) { ASSERT(fn != NULL); if (capacity == 0) { @@ -74,8 +76,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t pHashObj->hashFp = fn; ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); - pHashObj->keyLen = keyLen; - pHashObj->dataLen = dataLen; pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *)); if (!pHashObj->hashList) { @@ -93,40 +93,41 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) { return (int32_t)atomic_load_64((int64_t *)&pHashObj->size); } -static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { - SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dsize); +static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *data, size_t dataLen, uint32_t hashVal) { + SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dataLen); if (!pNewNode) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - + pNewNode->keyLen = keyLen; + pNewNode->dataLen = dataLen; pNewNode->next = NULL; - memcpy(GET_SHASH_NODE_DATA(pNewNode), pData, dsize); - memcpy(GET_SHASH_NODE_KEY(pNewNode, dsize), key, keyLen); + memcpy(GET_SHASH_NODE_DATA(pNewNode), data, dataLen); + memcpy(GET_SHASH_NODE_KEY(pNewNode, dataLen), key, keyLen); return pNewNode; } -static void taosHashTableResize(SSHashObj *pHashObj) { +static void tSimpleHashTableResize(SSHashObj *pHashObj) { if (!SHASH_NEED_RESIZE(pHashObj)) { return; } int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u); if (newCapacity > HASH_MAX_CAPACITY) { - // uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached", - // pHashObj->capacity, HASH_MAX_CAPACITY); + uDebug("current capacity:%zu, maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached", + pHashObj->capacity, HASH_MAX_CAPACITY); return; } int64_t st = taosGetTimestampUs(); void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity); if (!pNewEntryList) { - // qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); + uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); return; } size_t inc = newCapacity - pHashObj->capacity; - memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc); + memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc * sizeof(void *)); pHashObj->hashList = pNewEntryList; pHashObj->capacity = newCapacity; @@ -141,8 +142,8 @@ static void taosHashTableResize(SSHashObj *pHashObj) { SHNode *pPrev = NULL; while (pNode != NULL) { - void *key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen); - uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); + void *key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen); + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pNode->keyLen); int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity); pNext = pNode->next; @@ -170,23 +171,23 @@ static void taosHashTableResize(SSHashObj *pHashObj) { // ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); } -int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { +int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen) { if (!pHashObj || !key) { return -1; } - uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // need the resize process, write lock applied if (SHASH_NEED_RESIZE(pHashObj)) { - taosHashTableResize(pHashObj); + tSimpleHashTableResize(pHashObj); } int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHNode *pNode = pHashObj->hashList[slot]; if (!pNode) { - SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); + SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal); if (!pNewNode) { return -1; } @@ -197,14 +198,14 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { } while (pNode) { - if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) { + if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { break; } pNode = pNode->next; } if (!pNode) { - SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); + SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal); if (!pNewNode) { return -1; } @@ -212,16 +213,16 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { pHashObj->hashList[slot] = pNewNode; atomic_add_fetch_64(&pHashObj->size, 1); } else { // update data - memcpy(GET_SHASH_NODE_DATA(pNode), data, pHashObj->dataLen); + memcpy(GET_SHASH_NODE_DATA(pNode), data, dataLen); } return 0; } -static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, int32_t index) { +static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) { SHNode *pNode = pHashObj->hashList[index]; while (pNode) { - if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) { + if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { break; } @@ -233,12 +234,12 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) { return tSimpleHashGetSize(pHashObj) == 0; } -void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { +void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) { if (!pHashObj || taosHashTableEmpty(pHashObj) || !key) { return NULL; } - uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHNode *pNode = pHashObj->hashList[slot]; @@ -247,7 +248,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { } char *data = NULL; - pNode = doSearchInEntryList(pHashObj, key, slot); + pNode = doSearchInEntryList(pHashObj, key, keyLen, slot); if (pNode != NULL) { data = GET_SHASH_NODE_DATA(pNode); } @@ -255,19 +256,19 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { return data; } -int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key) { +int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { if (!pHashObj || !key) { return TSDB_CODE_FAILED; } - uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHNode *pNode = pHashObj->hashList[slot]; SHNode *pPrev = NULL; while (pNode) { - if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) { + if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { if (!pPrev) { pHashObj->hashList[slot] = pNode->next; } else { @@ -312,6 +313,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) { tSimpleHashClear(pHashObj); taosMemoryFreeClear(pHashObj->hashList); + taosMemoryFree(pHashObj); } size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { @@ -322,23 +324,13 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj); } -void *tSimpleHashGetKey(const SSHashObj *pHashObj, void *data, size_t *keyLen) { -#if 0 - int32_t offset = offsetof(SHNode, data); - SHNode *node = ((SHNode *)(char *)data - offset); +void *tSimpleHashGetKey(void *data, size_t *keyLen) { + SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data)); if (keyLen) { - *keyLen = pHashObj->keyLen; + *keyLen = node->keyLen; } - return POINTER_SHIFT(data, pHashObj->dataLen); - - return GET_SHASH_NODE_KEY(node, pHashObj->dataLen); -#endif - if (keyLen) { - *keyLen = pHashObj->keyLen; - } - - return POINTER_SHIFT(data, pHashObj->dataLen); + return POINTER_SHIFT(data, node->dataLen); } void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { @@ -376,5 +368,52 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { return GET_SHASH_NODE_DATA(pNode); } + return NULL; +} + +void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter) { + if (!pHashObj) { + return NULL; + } + + SHNode *pNode = NULL; + + if (!data) { + for (int32_t i = 0; i < pHashObj->capacity; ++i) { + pNode = pHashObj->hashList[i]; + if (!pNode) { + continue; + } + *iter = i; + if (key) { + *key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen); + } + return GET_SHASH_NODE_DATA(pNode); + } + return NULL; + } + + pNode = (SHNode *)((char *)data - offsetof(SHNode, data)); + + if (pNode->next) { + if (key) { + *key = GET_SHASH_NODE_KEY(pNode->next, pNode->next->dataLen); + } + return GET_SHASH_NODE_DATA(pNode->next); + } + + ++(*iter); + for (int32_t i = *iter; i < pHashObj->capacity; ++i) { + pNode = pHashObj->hashList[i]; + if (!pNode) { + continue; + } + *iter = i; + if (key) { + *key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen); + } + return GET_SHASH_NODE_DATA(pNode); + } + return NULL; } \ No newline at end of file diff --git a/source/libs/executor/test/tSimpleHashTests.cpp b/source/libs/executor/test/tSimpleHashTests.cpp index a17a7146ea..acb6d434b4 100644 --- a/source/libs/executor/test/tSimpleHashTests.cpp +++ b/source/libs/executor/test/tSimpleHashTests.cpp @@ -32,31 +32,33 @@ TEST(testCase, tSimpleHashTest) { SSHashObj *pHashObj = - tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), sizeof(int64_t), sizeof(int64_t)); + tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); assert(pHashObj != nullptr); ASSERT_EQ(0, tSimpleHashGetSize(pHashObj)); + size_t keyLen = sizeof(int64_t); + size_t dataLen = sizeof(int64_t); + int64_t originKeySum = 0; for (int64_t i = 1; i <= 100; ++i) { originKeySum += i; - tSimpleHashPut(pHashObj, (const void *)&i, (const void *)&i); + tSimpleHashPut(pHashObj, (const void *)&i, keyLen, (const void *)&i, dataLen); ASSERT_EQ(i, tSimpleHashGetSize(pHashObj)); } for (int64_t i = 1; i <= 100; ++i) { - void *data = tSimpleHashGet(pHashObj, (const void *)&i); + void *data = tSimpleHashGet(pHashObj, (const void *)&i, keyLen); ASSERT_EQ(i, *(int64_t *)data); } - void *data = NULL; int32_t iter = 0; int64_t keySum = 0; int64_t dataSum = 0; while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { - void *key = tSimpleHashGetKey(pHashObj, data, NULL); + void *key = tSimpleHashGetKey(data, NULL); keySum += *(int64_t *)key; dataSum += *(int64_t *)data; } @@ -65,7 +67,7 @@ TEST(testCase, tSimpleHashTest) { ASSERT_EQ(keySum, originKeySum); for (int64_t i = 1; i <= 100; ++i) { - tSimpleHashRemove(pHashObj, (const void *)&i); + tSimpleHashRemove(pHashObj, (const void *)&i, keyLen); ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj)); } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 3776cb261f..6e30eeaa86 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -32,7 +32,6 @@ typedef struct { static SStreamGlobalEnv streamEnv; -int32_t streamExec(SStreamTask* pTask); int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch); int32_t streamDispatch(SStreamTask* pTask); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6da7d4fd59..d6e87c2736 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -185,7 +185,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S tFreeStreamDispatchReq(pReq); if (exec) { - streamTryExec(pTask); + if (streamTryExec(pTask) < 0) { + return -1; + } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); @@ -221,7 +223,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { } int32_t streamProcessRunReq(SStreamTask* pTask) { - streamTryExec(pTask); + if (streamTryExec(pTask) < 0) { + return -1; + } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 64a9537e6c..5ff700546c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -15,6 +15,7 @@ #include "executor.h" #include "tstream.h" +#include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -99,16 +100,19 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* goto FAIL; } - taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + goto FAIL; + } if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) { + taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t)); ASSERT(0); - return -1; + goto FAIL; } return 0; FAIL: - if (pTask) taosMemoryFree(pTask); + if (pTask) tFreeSStreamTask(pTask); return -1; } @@ -158,11 +162,28 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* pTask = *ppTask; taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + + if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) { + /*return -1;*/ + } + + if (pTask->triggerParam != 0) { + taosTmrStop(pTask->timer); + } + + while (1) { + int8_t schedStatus = + atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING); + if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { + tFreeSStreamTask(pTask); + break; + } else if (schedStatus == TASK_SCHED_STATUS__DROPPING) { + break; + } + taosMsleep(10); + } } - if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) { - /*return -1;*/ - } return 0; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 6819e5329f..ac10c82587 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tstream.h" +#include "streamInc.h" SStreamQueue* streamQueueOpen() { SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); @@ -36,9 +36,12 @@ void streamQueueClose(SStreamQueue* queue) { while (1) { void* qItem = streamQueueNextItem(queue); if (qItem) { - taosFreeQitem(qItem); + streamFreeQitem(qItem); } else { - return; + break; } } + taosFreeQall(queue->qall); + taosCloseQueue(queue->queue); + taosMemoryFree(queue); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 638d39e5cc..d588d90543 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -152,9 +152,17 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } void tFreeSStreamTask(SStreamTask* pTask) { - streamQueueClose(pTask->inputQueue); - streamQueueClose(pTask->outputQueue); + if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); + if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); + taosArrayDestroy(pTask->childEpInfo); + if (pTask->outputType == TASK_OUTPUT__TABLE) { + tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); + taosMemoryFree(pTask->tbSink.pTSchema); + } + if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + } taosMemoryFree(pTask); } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index de43c81654..0afc373f2d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -237,7 +237,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode); bool syncNodeHasSnapshot(SSyncNode* pSyncNode); void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); -SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode); +SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3829ea0730..1e68fe346c 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -146,23 +146,47 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { int64_t timeNow = taosGetTimestampMs(); for (int i = 0; i < pSyncNode->peersNum; ++i) { - int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); - int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); + SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]); - int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow); - int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime); + int64_t recvTimeDiff = TABS(peerRecvTime - timeNow); + int64_t startTimeDiff = TABS(peerStartTime - pSyncNode->startTime); + int64_t logDiff = TABS(peerMatchIndex - syncNodeGetLastIndex(pSyncNode)); + + /* + int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow); + int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime); + int64_t logDiff = syncNodeAbs64(peerMatchIndex, syncNodeGetLastIndex(pSyncNode)); + */ int32_t addQuorum = 0; if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) { - addQuorum = 1; + if (startTimeDiff < SYNC_MAX_START_TIME_RANGE_MS) { + addQuorum = 1; + } else { + if (logDiff < SYNC_ADD_QUORUM_COUNT) { + addQuorum = 1; + } else { + addQuorum = 0; + } + } } else { addQuorum = 0; } - if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) { - addQuorum = 0; - } + /* + if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) { + addQuorum = 1; + } else { + addQuorum = 0; + } + + if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) { + addQuorum = 0; + } + */ quorum += addQuorum; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a00b59d292..3fe600ecbb 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2284,7 +2284,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { // return max(logLastIndex, snapshotLastIndex) // if no snapshot and log, return -1 -SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { +SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) { SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -2773,11 +2773,27 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p return 0; } - if (ths->vgId > 1) { - syncNodeEventLog(ths, "I am vnode, can not do leader transfer"); + if (pEntry->term < ths->pRaftStore->currentTerm) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "little term:%lu, can not do leader transfer", pEntry->term); + syncNodeEventLog(ths, logBuf); return 0; } + if (pEntry->index < syncNodeGetLastIndex(ths)) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "little index:%ld, can not do leader transfer", pEntry->index); + syncNodeEventLog(ths, logBuf); + return 0; + } + + /* + if (ths->vgId > 1) { + syncNodeEventLog(ths, "I am vnode, can not do leader transfer"); + return 0; + } + */ + do { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%ld", pEntry->index); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 7b06967940..662a3f0c88 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -293,6 +293,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") @@ -616,6 +617,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")