Merge branch '3.0' into feature/TD-14761
This commit is contained in:
commit
fbc69a10c4
|
@ -43,6 +43,7 @@ def pre_test(){
|
|||
cd ${WKC}
|
||||
git reset --hard
|
||||
git clean -fxd
|
||||
rm -rf examples/rust/
|
||||
git remote prune origin
|
||||
git fetch
|
||||
'''
|
||||
|
|
|
@ -98,10 +98,9 @@ int32_t create_stream() {
|
|||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||
pRes =
|
||||
taos_query(pConn,
|
||||
"create stream stream1 trigger max_delay 10s into outstb as select _wstart, sum(k) from st1 partition "
|
||||
"by tbname session(ts, 10s) ");
|
||||
pRes = taos_query(pConn,
|
||||
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
|
||||
"count(k) from st1 partition by tbname interval(20s) ");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
|
|
@ -60,6 +60,7 @@ enum {
|
|||
STREAM_INPUT__DATA_RETRIEVE,
|
||||
STREAM_INPUT__GET_RES,
|
||||
STREAM_INPUT__CHECKPOINT,
|
||||
STREAM_INPUT__DESTROY,
|
||||
};
|
||||
|
||||
typedef enum EStreamType {
|
||||
|
|
|
@ -2663,6 +2663,10 @@ typedef struct {
|
|||
SEpSet epSet;
|
||||
} SVgEpSet;
|
||||
|
||||
typedef struct {
|
||||
int32_t padding;
|
||||
} SRSmaExecMsg;
|
||||
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
int8_t level;
|
||||
|
|
|
@ -202,6 +202,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
||||
|
|
|
@ -53,6 +53,7 @@ enum {
|
|||
TASK_SCHED_STATUS__WAITING,
|
||||
TASK_SCHED_STATUS__ACTIVE,
|
||||
TASK_SCHED_STATUS__FAILED,
|
||||
TASK_SCHED_STATUS__DROPPING,
|
||||
};
|
||||
|
||||
enum {
|
||||
|
@ -127,6 +128,10 @@ typedef struct {
|
|||
int8_t type;
|
||||
} SStreamCheckpoint;
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
} SStreamTaskDestroy;
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
SSDataBlock* pBlock;
|
||||
|
@ -211,7 +216,6 @@ typedef struct {
|
|||
void* vnode;
|
||||
FTbSink* tbSinkFunc;
|
||||
STSchema* pTSchema;
|
||||
SHashObj* pHash; // groupId to tbuid
|
||||
} STaskSinkTb;
|
||||
|
||||
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
|
||||
|
|
|
@ -34,6 +34,7 @@ extern bool gRaftDetailLog;
|
|||
#define SYNC_MAX_PROGRESS_WAIT_MS 4000
|
||||
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
|
||||
#define SYNC_MAX_RECV_TIME_RANGE_MS 1000
|
||||
#define SYNC_ADD_QUORUM_COUNT 3
|
||||
|
||||
#define SYNC_MAX_BATCH_SIZE 1
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
|
|
|
@ -291,6 +291,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
|
||||
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
|
||||
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
|
||||
#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4)
|
||||
|
||||
// mnode-sma
|
||||
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
|
||||
|
@ -614,6 +615,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
|
||||
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
|
||||
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
|
||||
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
|
||||
|
||||
//index
|
||||
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
||||
|
|
|
@ -338,6 +338,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -636,6 +636,7 @@ typedef struct {
|
|||
|
||||
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
||||
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
|
||||
void tFreeStreamObj(SStreamObj* pObj);
|
||||
|
||||
typedef struct {
|
||||
char streamName[TSDB_STREAM_FNAME_LEN];
|
||||
|
|
|
@ -34,6 +34,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
|
|||
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
|
||||
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
|
||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
|
||||
void mndFreeStb(SStbObj *pStb);
|
||||
|
||||
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
||||
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
||||
|
|
|
@ -116,6 +116,25 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeStreamObj(SStreamObj *pStream) {
|
||||
taosMemoryFree(pStream->sql);
|
||||
taosMemoryFree(pStream->ast);
|
||||
taosMemoryFree(pStream->physicalPlan);
|
||||
if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema);
|
||||
|
||||
int32_t sz = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
int32_t taskSz = taosArrayGetSize(pLevel);
|
||||
for (int32_t j = 0; j < taskSz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
tFreeSStreamTask(pTask);
|
||||
}
|
||||
taosArrayDestroy(pLevel);
|
||||
}
|
||||
taosArrayDestroy(pStream->tasks);
|
||||
}
|
||||
|
||||
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
|
||||
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||
if (pVgEpNew == NULL) return NULL;
|
||||
|
|
|
@ -424,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
}
|
||||
mndAddTaskToTaskSet(taskSourceLevel, pTask);
|
||||
|
||||
pTask->triggerParam = 0;
|
||||
|
||||
// source
|
||||
pTask->taskLevel = TASK_LEVEL__SOURCE;
|
||||
|
||||
|
|
|
@ -266,6 +266,15 @@ _OVER:
|
|||
return pRow;
|
||||
}
|
||||
|
||||
void mndFreeStb(SStbObj *pStb) {
|
||||
taosArrayDestroy(pStb->pFuncs);
|
||||
taosMemoryFreeClear(pStb->pColumns);
|
||||
taosMemoryFreeClear(pStb->pTags);
|
||||
taosMemoryFreeClear(pStb->comment);
|
||||
taosMemoryFreeClear(pStb->pAst1);
|
||||
taosMemoryFreeClear(pStb->pAst2);
|
||||
}
|
||||
|
||||
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
|
||||
mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
|
||||
return 0;
|
||||
|
@ -273,12 +282,7 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
|
|||
|
||||
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
||||
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
|
||||
taosArrayDestroy(pStb->pFuncs);
|
||||
taosMemoryFreeClear(pStb->pColumns);
|
||||
taosMemoryFreeClear(pStb->pTags);
|
||||
taosMemoryFreeClear(pStb->comment);
|
||||
taosMemoryFreeClear(pStb->pAst1);
|
||||
taosMemoryFreeClear(pStb->pAst2);
|
||||
mndFreeStb(pStb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -2021,8 +2025,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
|||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
|
||||
if (pCol->tableId != suid) {
|
||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
if (pCol->tableId == suid) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
|
@ -2045,6 +2048,16 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
|||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pStream->smaId != 0) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pStream->targetStbUid == suid) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
|
||||
ASSERT(0);
|
||||
|
@ -2057,8 +2070,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
|||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
|
||||
if (pCol->tableId != suid) {
|
||||
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
|
||||
if (pCol->tableId == suid) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
nodesDestroyNode(pAst);
|
||||
return -1;
|
||||
|
|
|
@ -167,6 +167,9 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
|
|||
|
||||
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
|
||||
mTrace("stream:%s, perform delete action", pStream->name);
|
||||
taosWLockLatch(&pStream->lock);
|
||||
tFreeStreamObj(pStream);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -493,10 +496,17 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
|
||||
stbObj.uid = pStream->targetStbUid;
|
||||
|
||||
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
|
||||
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
|
||||
mndFreeStb(&stbObj);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
tFreeSMCreateStbReq(&createReq);
|
||||
mndFreeStb(&stbObj);
|
||||
|
||||
return 0;
|
||||
_OVER:
|
||||
tFreeSMCreateStbReq(&createReq);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
return -1;
|
||||
|
@ -715,6 +725,7 @@ _OVER:
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
tFreeSCMCreateStreamReq(&createStreamReq);
|
||||
tFreeStreamObj(&streamObj);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
|
|||
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
|
||||
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
|
||||
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
|
||||
int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray* list);
|
||||
void *vnodeGetIdx(SVnode *pVnode);
|
||||
void *vnodeGetIvtIdx(SVnode *pVnode);
|
||||
|
||||
|
|
|
@ -60,6 +60,7 @@ typedef struct {
|
|||
#define SMA_ENV_LOCK(env) (&(env)->lock)
|
||||
#define SMA_ENV_TYPE(env) ((env)->type)
|
||||
#define SMA_ENV_STAT(env) ((env)->pStat)
|
||||
#define SMA_RSMA_STAT(sma) ((SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)(sma)->pRSmaEnv))
|
||||
|
||||
struct STSmaStat {
|
||||
int8_t state; // ETsdbSmaStat
|
||||
|
@ -89,12 +90,14 @@ struct SRSmaStat {
|
|||
SSma *pSma;
|
||||
int64_t commitAppliedVer; // vnode applied version for async commit
|
||||
int64_t refId; // shared by fetch tasks
|
||||
volatile int64_t qBufSize; // queue buffer size
|
||||
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
|
||||
int8_t triggerStat; // shared by fetch tasks
|
||||
int8_t commitStat; // 0 not in committing, 1 in committing
|
||||
int8_t execStat; // 0 not in exec , 1 in exec
|
||||
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
|
||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||
SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash
|
||||
SHashObj *infoHash; // key: suid, value: SRSmaInfo
|
||||
SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2
|
||||
};
|
||||
|
||||
struct SSmaStat {
|
||||
|
@ -105,10 +108,10 @@ struct SSmaStat {
|
|||
T_REF_DECLARE()
|
||||
};
|
||||
|
||||
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
|
||||
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
|
||||
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
|
||||
#define RSMA_IMU_INFO_HASH(r) ((r)->iRsmaInfoHash)
|
||||
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
|
||||
#define SMA_STAT_RSMA(s) (&(s)->rsmaStat)
|
||||
#define RSMA_INFO_HASH(r) ((r)->infoHash)
|
||||
#define RSMA_FETCH_HASH(r) ((r)->fetchHash)
|
||||
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
|
||||
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
|
||||
#define RSMA_REF_ID(r) ((r)->refId)
|
||||
|
@ -117,6 +120,7 @@ struct SSmaStat {
|
|||
struct SRSmaInfoItem {
|
||||
int8_t level;
|
||||
int8_t triggerStat;
|
||||
uint16_t interval; // second
|
||||
int32_t maxDelay;
|
||||
tmr_h tmrId;
|
||||
};
|
||||
|
@ -125,14 +129,19 @@ struct SRSmaInfo {
|
|||
STSchema *pTSchema;
|
||||
int64_t suid;
|
||||
int64_t refId; // refId of SRSmaStat
|
||||
int8_t delFlag;
|
||||
uint64_t delFlag : 1;
|
||||
uint64_t lastReceived : 63; // second
|
||||
T_REF_DECLARE()
|
||||
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
||||
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
|
||||
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable
|
||||
STaosQueue *queue; // buffer queue of SubmitReq
|
||||
STaosQall *qall; // buffer qall of SubmitReq
|
||||
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t
|
||||
STaosQueue *iQueue; // immutable buffer queue of SubmitReq
|
||||
STaosQall *iQall; // immutable buffer qall of SubmitReq
|
||||
};
|
||||
|
||||
#define RSMA_INFO_HEAD_LEN 32
|
||||
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
|
||||
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
|
||||
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
|
||||
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
|
||||
|
@ -161,6 +170,12 @@ enum {
|
|||
RSMA_RESTORE_SYNC = 2,
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
RSMA_EXEC_OVERFLOW = 1, // triggered by queue buf overflow
|
||||
RSMA_EXEC_TIMEOUT = 2, // triggered by timer
|
||||
RSMA_EXEC_COMMIT = 3, // triggered by commit
|
||||
} ERsmaExecType;
|
||||
|
||||
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
||||
|
||||
|
@ -228,12 +243,13 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
|
|||
|
||||
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
|
||||
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc);
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
|
||||
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
||||
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
|
||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
|
||||
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
|
||||
|
||||
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
||||
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
|
||||
|
|
|
@ -65,6 +65,7 @@ struct SVBufPool {
|
|||
SVBufPool* next;
|
||||
SVnode* pVnode;
|
||||
volatile int32_t nRef;
|
||||
TdThreadSpinlock lock;
|
||||
int64_t size;
|
||||
uint8_t* ptr;
|
||||
SVBufPoolNode* pTail;
|
||||
|
|
|
@ -199,6 +199,7 @@ int32_t smaAsyncCommit(SSma* pSma);
|
|||
int32_t smaAsyncPostCommit(SSma* pSma);
|
||||
int32_t smaDoRetention(SSma* pSma, int64_t now);
|
||||
int32_t smaProcessFetch(SSma* pSma, void* pMsg);
|
||||
int32_t smaProcessExec(SSma* pSma, void* pMsg);
|
||||
|
||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||
|
|
|
@ -83,8 +83,7 @@ int32_t smaBegin(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
|
||||
int8_t rsmaTriggerStat =
|
||||
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
|
||||
|
@ -123,7 +122,7 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
|
|||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
|
||||
|
||||
// step 1: set rsma stat paused
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||
|
@ -289,8 +288,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(SMA_ENV_STAT(pSmaEnv));
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
|
||||
// cleanup outdated qtaskinfo files
|
||||
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
|
||||
|
@ -299,10 +297,9 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief Rsma async commit implementation
|
||||
* @brief Rsma async commit implementation(only do some necessary light weighted task)
|
||||
* 1) set rsma stat TASK_TRIGGER_STAT_PAUSED
|
||||
* 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
|
||||
* 3)
|
||||
*
|
||||
* @param pSma
|
||||
* @return int32_t
|
||||
|
@ -314,7 +311,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
|
||||
|
||||
// step 1: set rsma stat
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||
|
@ -336,28 +333,55 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
}
|
||||
}
|
||||
|
||||
// step 3: swap rsmaInfoHash and iRsmaInfoHash
|
||||
/**
|
||||
* @brief step 3: consume the SubmitReq in buffer
|
||||
* 1) This is high cost task and should not put in asyncPreCommit originally.
|
||||
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
|
||||
*/
|
||||
nLoops = 0;
|
||||
smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
|
||||
int8_t old;
|
||||
while (1) {
|
||||
old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1);
|
||||
if (old == 0) break;
|
||||
if (++nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
}
|
||||
}
|
||||
|
||||
smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
|
||||
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) {
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// step 4: swap queue/qall and iQueue/iQall
|
||||
// lock
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
ASSERT(RSMA_INFO_HASH(pRSmaStat));
|
||||
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat));
|
||||
|
||||
RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat);
|
||||
RSMA_INFO_HASH(pRSmaStat) =
|
||||
taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
||||
|
||||
if (!RSMA_INFO_HASH(pRSmaStat)) {
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
while (pIter) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
|
||||
TSWAP(pInfo->iQall, pInfo->qall);
|
||||
TSWAP(pInfo->iQueue, pInfo->queue);
|
||||
TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
|
||||
TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
|
||||
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
|
||||
}
|
||||
|
||||
atomic_store_64(&pRSmaStat->qBufSize, 0);
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
// step 4: others
|
||||
// step 5: others
|
||||
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -375,17 +399,18 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
|
||||
// perform persist task for qTaskInfo
|
||||
tdRSmaPersistExecImpl(pRSmaStat, RSMA_IMU_INFO_HASH(pRSmaStat));
|
||||
// perform persist task for qTaskInfo operator
|
||||
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty.
|
||||
* @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty.
|
||||
*
|
||||
* @param pSma
|
||||
* @return int32_t
|
||||
|
@ -396,37 +421,25 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
SArray *rsmaDeleted = NULL;
|
||||
|
||||
// step 1: merge rsmaInfoHash and iRsmaInfoHash
|
||||
// step 1: merge qTaskInfo and iQTaskInfo
|
||||
// lock
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
#if 0
|
||||
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
|
||||
// just switch the hash pointer if rsmaInfoHash is empty
|
||||
if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) {
|
||||
SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat);
|
||||
RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat);
|
||||
RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash;
|
||||
}
|
||||
} else {
|
||||
#endif
|
||||
#if 1
|
||||
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
|
||||
|
||||
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
||||
while (pIter) {
|
||||
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
|
||||
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
|
||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
|
||||
if (refVal == 0) {
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
|
||||
"table:%" PRIi64,
|
||||
SMA_VID(pSma), *pSuid);
|
||||
if (!rsmaDeleted) {
|
||||
if ((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))) {
|
||||
taosArrayPush(rsmaDeleted, pSuid);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
|
||||
|
@ -434,27 +447,40 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
|||
SMA_VID(pSma), refVal, *pSuid);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
|
||||
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
|
||||
continue;
|
||||
}
|
||||
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
|
||||
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
|
||||
*pSuid);
|
||||
|
||||
if (pRSmaInfo->taskInfo[0]) {
|
||||
if (pRSmaInfo->iTaskInfo[0]) {
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||
pRSmaInfo->iTaskInfo[0] = NULL;
|
||||
}
|
||||
} else {
|
||||
// free the resources
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
|
||||
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
|
||||
*pSuid);
|
||||
TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
|
||||
}
|
||||
#endif
|
||||
// }
|
||||
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
|
||||
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
|
||||
|
||||
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat));
|
||||
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL;
|
||||
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
|
||||
tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i);
|
||||
void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
|
||||
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
|
||||
"table:%" PRIi64,
|
||||
SMA_VID(pSma), *pSuid);
|
||||
}
|
||||
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
|
||||
}
|
||||
taosArrayDestroy(rsmaDeleted);
|
||||
// TODO: remove suid in files?
|
||||
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
|
|
@ -228,7 +228,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
|||
RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
|
||||
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
if (!RSMA_INFO_HASH(pRSmaStat)) {
|
||||
taosMemoryFreeClear(*pSmaStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
RSMA_FETCH_HASH(pRSmaStat) = taosHashInit(
|
||||
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
if (!RSMA_FETCH_HASH(pRSmaStat)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
|
@ -264,8 +269,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
|
||||
|
||||
// step 2: destroy the rsma info and associated fetch tasks
|
||||
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
|
||||
#if 1
|
||||
if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
|
||||
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
|
||||
while (infoHash) {
|
||||
|
@ -274,10 +277,12 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
||||
|
||||
// step 3: wait all triggered fetch tasks finished
|
||||
// step 3: destroy the rsma fetch hash
|
||||
taosHashCleanup(RSMA_FETCH_HASH(pStat));
|
||||
|
||||
// step 4: wait all triggered fetch tasks finished
|
||||
int32_t nLoops = 0;
|
||||
while (1) {
|
||||
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
|
||||
|
@ -293,7 +298,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
}
|
||||
}
|
||||
|
||||
// step 4: free pStat
|
||||
// step 5: free pStat
|
||||
taosMemoryFreeClear(pStat);
|
||||
}
|
||||
}
|
||||
|
@ -318,9 +323,9 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
|||
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
||||
if (pSmaStat) {
|
||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
|
||||
tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
|
||||
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
|
||||
SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
|
||||
int32_t vid = SMA_VID(pRSmaStat->pSma);
|
||||
int64_t refId = RSMA_REF_ID(pRSmaStat);
|
||||
if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
|
||||
|
|
|
@ -15,8 +15,10 @@
|
|||
|
||||
#include "sma.h"
|
||||
|
||||
#define RSMA_QTASKINFO_BUFSIZE 32768
|
||||
#define RSMA_QTASKINFO_BUFSIZE (32768)
|
||||
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
||||
#define RSMA_QTASKEXEC_BUFSIZE (1048576)
|
||||
#define RSMA_SUBMIT_BATCH_SIZE (1024)
|
||||
|
||||
SSmaMgmt smaMgmt = {
|
||||
.inited = 0,
|
||||
|
@ -27,17 +29,20 @@ SSmaMgmt smaMgmt = {
|
|||
#define TD_RSMAINFO_DEL_FILE "rsmainfo.del"
|
||||
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
||||
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
||||
typedef struct SRSmaExecQItem SRSmaExecQItem;
|
||||
|
||||
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
||||
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||
int8_t idx);
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid,
|
||||
int8_t level);
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||
ERsmaExecType type, int8_t level);
|
||||
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
|
||||
static void tdFreeRSmaSubmitItems(SArray *pItems);
|
||||
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr);
|
||||
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
||||
int64_t suid, int8_t blkType);
|
||||
int64_t suid);
|
||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||
static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level);
|
||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
||||
|
@ -76,6 +81,11 @@ struct SRSmaQTaskInfoIter {
|
|||
int32_t nBufPos;
|
||||
};
|
||||
|
||||
struct SRSmaExecQItem {
|
||||
void *pRSmaInfo;
|
||||
void *qall;
|
||||
};
|
||||
|
||||
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) {
|
||||
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
|
||||
}
|
||||
|
@ -139,6 +149,18 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
|
|||
if (isDeepFree) {
|
||||
taosMemoryFreeClear(pInfo->pTSchema);
|
||||
}
|
||||
|
||||
if (isDeepFree) {
|
||||
if (pInfo->queue) taosCloseQueue(pInfo->queue);
|
||||
if (pInfo->qall) taosFreeQall(pInfo->qall);
|
||||
if (pInfo->iQueue) taosCloseQueue(pInfo->iQueue);
|
||||
if (pInfo->iQall) taosFreeQall(pInfo->iQall);
|
||||
pInfo->queue = NULL;
|
||||
pInfo->qall = NULL;
|
||||
pInfo->iQueue = NULL;
|
||||
pInfo->iQall = NULL;
|
||||
}
|
||||
|
||||
taosMemoryFree(pInfo);
|
||||
}
|
||||
|
||||
|
@ -179,7 +201,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
|
|||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pRSmaInfo->taskInfo[i]) {
|
||||
if ((qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true) < 0)) {
|
||||
if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true)) < 0)) {
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
|
||||
terrstr());
|
||||
|
@ -351,6 +373,19 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
goto _err;
|
||||
}
|
||||
pRSmaInfo->pTSchema = pTSchema;
|
||||
if (!(pRSmaInfo->queue = taosOpenQueue())) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (!(pRSmaInfo->qall = taosAllocateQall())) {
|
||||
goto _err;
|
||||
}
|
||||
if (!(pRSmaInfo->iQueue = taosOpenQueue())) {
|
||||
goto _err;
|
||||
}
|
||||
if (!(pRSmaInfo->iQall = taosAllocateQall())) {
|
||||
goto _err;
|
||||
}
|
||||
pRSmaInfo->suid = suid;
|
||||
pRSmaInfo->refId = RSMA_REF_ID(pStat);
|
||||
T_REF_INIT_VAL(pRSmaInfo, 1);
|
||||
|
@ -419,8 +454,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
|
||||
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid);
|
||||
|
||||
|
@ -528,6 +562,14 @@ void *tdUidStoreFree(STbUidStore *pStore) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue
|
||||
* would be freed when close Vnode, thus lock should be used if with race condition.
|
||||
* @param pTsdb
|
||||
* @param version
|
||||
* @param pReq
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
|
||||
if (!pReq) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
|
@ -535,7 +577,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
|
|||
}
|
||||
|
||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||
|
||||
// TODO: spin lock for race conditiond
|
||||
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -569,17 +611,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void tdDestroySDataBlockArray(SArray *pArray) {
|
||||
// TODO
|
||||
#if 0
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||
SSDataBlock *pDataBlock = taosArrayGet(pArray, i);
|
||||
blockDestroyInner(pDataBlock);
|
||||
}
|
||||
#endif
|
||||
taosArrayDestroy(pArray);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief retention of rsma1/rsma2
|
||||
*
|
||||
|
@ -605,7 +636,7 @@ _end:
|
|||
}
|
||||
|
||||
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
||||
int64_t suid, int8_t blkType) {
|
||||
int64_t suid) {
|
||||
SArray *pResList = taosArrayInit(1, POINTER_BYTES);
|
||||
if (pResList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -637,8 +668,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
|
|||
} else {
|
||||
smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
|
||||
}
|
||||
|
||||
#if 1
|
||||
#if 0
|
||||
char flag[10] = {0};
|
||||
snprintf(flag, 10, "level %" PRIi8, pItem->level);
|
||||
blockDebugShowDataBlocks(pResList, flag);
|
||||
|
@ -665,7 +695,6 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
|
|||
|
||||
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64,
|
||||
SMA_VID(pSma), suid, pItem->level, output->info.version);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -677,34 +706,113 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid,
|
||||
int8_t level) {
|
||||
/**
|
||||
* @brief Copy msg to rsmaQueueBuffer for batch process
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @param inputType
|
||||
* @param pInfo
|
||||
* @param suid
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo,
|
||||
tb_uid_t suid) {
|
||||
const SSubmitReq *pReq = (const SSubmitReq *)pMsg;
|
||||
|
||||
void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM);
|
||||
if (!qItem) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
memcpy(qItem, pMsg, pReq->header.contLen);
|
||||
|
||||
taosWriteQitem(pInfo->queue, qItem);
|
||||
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen);
|
||||
|
||||
// smoothing consume
|
||||
int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZE;
|
||||
if (n > 1) {
|
||||
if (n > 10) {
|
||||
n = 10;
|
||||
}
|
||||
taosMsleep(n << 4);
|
||||
if (n > 2) {
|
||||
smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
|
||||
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
|
||||
} else {
|
||||
smaDebug("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
|
||||
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
STSRow *row = NULL;
|
||||
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
|
||||
while (true) {
|
||||
SSubmitBlk *pBlock = NULL;
|
||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||
if (pBlock == NULL) break;
|
||||
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
|
||||
SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief sync mode
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @param msgSize
|
||||
* @param inputType
|
||||
* @param pInfo
|
||||
* @param type
|
||||
* @param level
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||
ERsmaExecType type, int8_t level) {
|
||||
int32_t idx = level - 1;
|
||||
if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) {
|
||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
|
||||
|
||||
void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
|
||||
if (!qTaskInfo) {
|
||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
|
||||
pInfo->suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (!pInfo->pTSchema) {
|
||||
smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
|
||||
smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
|
||||
RSMA_INFO_QTASK(pInfo, idx), suid);
|
||||
RSMA_INFO_QTASK(pInfo, idx), pInfo->suid);
|
||||
|
||||
if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT
|
||||
#if 0
|
||||
for (int32_t i = 0; i < msgSize; ++i) {
|
||||
SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *));
|
||||
smaDebug("vgId:%d, [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version);
|
||||
tdRsmaPrintSubmitReq(pSma, pReq);
|
||||
}
|
||||
#endif
|
||||
if (qSetMultiStreamInput(qTaskInfo, pMsg, msgSize, inputType) < 0) {
|
||||
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
|
||||
tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid,
|
||||
STREAM_INPUT__DATA_SUBMIT);
|
||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
|
||||
if (smaMgmt.tmrHandle) {
|
||||
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
}
|
||||
tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -739,51 +847,20 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
|||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return NULL;
|
||||
}
|
||||
if (!pRSmaInfo->taskInfo[0]) {
|
||||
if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) {
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
tdRefRSmaInfo(pSma, pRSmaInfo);
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
ASSERT(pRSmaInfo->suid == suid);
|
||||
return pRSmaInfo;
|
||||
}
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
|
||||
SRSmaInfo *pCowRSmaInfo = NULL;
|
||||
// lock
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
if (!(pCowRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)))) { // 2-phase lock
|
||||
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||
if (iRSmaInfo) {
|
||||
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
|
||||
if (pIRSmaInfo && !RSMA_INFO_IS_DEL(pIRSmaInfo)) {
|
||||
if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) {
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
|
||||
return NULL;
|
||||
}
|
||||
smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) {
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo;
|
||||
ASSERT(!pCowRSmaInfo);
|
||||
}
|
||||
|
||||
if (pCowRSmaInfo) {
|
||||
tdRefRSmaInfo(pSma, pCowRSmaInfo);
|
||||
}
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return pCowRSmaInfo;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
||||
|
@ -792,22 +869,90 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||
/**
|
||||
* @brief async mode
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @param inputType
|
||||
* @param suid
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
|
||||
if (!pRSmaInfo) {
|
||||
smaError("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1);
|
||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2);
|
||||
if (tdExecuteRSmaImplAsync(pSma, pMsg, inputType, pRSmaInfo, suid) < 0) {
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
if (smaMgmt.tmrHandle) {
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
|
||||
if (pItem->level > 0) {
|
||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
}
|
||||
pItem = RSMA_INFO_ITEM(pRSmaInfo, 1);
|
||||
if (pItem->level > 0) {
|
||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tdRSmaExecCheck(SSma *pSma) {
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
int64_t bufSize = atomic_load_64(&pRSmaStat->qBufSize);
|
||||
|
||||
if (bufSize < RSMA_QTASKEXEC_BUFSIZE) {
|
||||
smaDebug("vgId:%d, bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize,
|
||||
RSMA_QTASKEXEC_BUFSIZE);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 1) {
|
||||
smaDebug("vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma),
|
||||
bufSize);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize);
|
||||
|
||||
SRSmaExecMsg fetchMsg;
|
||||
int32_t contLen = sizeof(SMsgHead);
|
||||
void *pBuf = rpcMallocCont(0 + contLen);
|
||||
|
||||
((SMsgHead *)pBuf)->vgId = SMA_VID(pSma);
|
||||
((SMsgHead *)pBuf)->contLen = sizeof(SMsgHead);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.code = 0,
|
||||
.msgType = TDMT_VND_EXEC_RSMA,
|
||||
.pCont = pBuf,
|
||||
.contLen = contLen,
|
||||
};
|
||||
|
||||
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) {
|
||||
smaError("vgId:%d, failed to put rsma exec msg into query-queue since %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, success to put rsma fetch msg into query-queue", SMA_VID(pSma));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
if (!pEnv) {
|
||||
|
@ -826,45 +971,62 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
|||
tdFetchSubmitReqSuids(pMsg, &uidStore);
|
||||
|
||||
if (uidStore.suid != 0) {
|
||||
tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid);
|
||||
tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid);
|
||||
|
||||
void *pIter = taosHashIterate(uidStore.uidHash, NULL);
|
||||
while (pIter) {
|
||||
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid);
|
||||
tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid);
|
||||
pIter = taosHashIterate(uidStore.uidHash, pIter);
|
||||
}
|
||||
|
||||
tdUidStoreDestory(&uidStore);
|
||||
|
||||
tdRSmaExecCheck(pSma);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief retrieve rsma meta and init
|
||||
*
|
||||
* @param pSma
|
||||
* @param nTables number of tables of rsma
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
SArray *suidList = NULL;
|
||||
STbUidStore uidStore = {0};
|
||||
SMetaReader mr = {0};
|
||||
|
||||
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
|
||||
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
|
||||
taosArrayDestroy(suidList);
|
||||
if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (vnodeGetStbIdList(pSma->pVnode, 0, suidList) < 0) {
|
||||
smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int64_t arrSize = taosArrayGetSize(suidList);
|
||||
|
||||
if (nTables) {
|
||||
*nTables = arrSize;
|
||||
}
|
||||
|
||||
if (arrSize == 0) {
|
||||
if (nTables) {
|
||||
*nTables = 0;
|
||||
}
|
||||
taosArrayDestroy(suidList);
|
||||
smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SMetaReader mr = {0};
|
||||
int64_t nRsmaTables = 0;
|
||||
metaReaderInit(&mr, SMA_META(pSma), 0);
|
||||
if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
|
||||
goto _err;
|
||||
}
|
||||
for (int64_t i = 0; i < arrSize; ++i) {
|
||||
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
|
||||
smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
|
||||
|
@ -877,6 +1039,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
|
|||
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
|
||||
ASSERT(mr.me.uid == suid);
|
||||
if (TABLE_IS_ROLLUP(mr.me.flags)) {
|
||||
++nRsmaTables;
|
||||
SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
|
||||
for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64
|
||||
|
@ -887,17 +1050,40 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
|
|||
smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// reload all ctbUids for suid
|
||||
uidStore.suid = suid;
|
||||
if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
|
||||
smaError("vgId:%d, rsma restore, get ctb idlist failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
|
||||
terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tdUpdateTbUidList(pVnode->pSma, &uidStore) < 0) {
|
||||
smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
|
||||
terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
taosArrayClear(uidStore.tbUids);
|
||||
|
||||
smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
|
||||
}
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
taosArrayDestroy(suidList);
|
||||
tdUidStoreDestory(&uidStore);
|
||||
|
||||
if (nTables) {
|
||||
*nTables = nRsmaTables;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
metaReaderClear(&mr);
|
||||
taosArrayDestroy(suidList);
|
||||
tdUidStoreDestory(&uidStore);
|
||||
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -1230,7 +1416,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i);
|
||||
if (!taskInfo) {
|
||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
|
||||
continue;
|
||||
|
@ -1388,7 +1574,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
}
|
||||
|
||||
_end:
|
||||
// taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
|
||||
}
|
||||
|
||||
|
@ -1400,7 +1586,7 @@ _end:
|
|||
* @param level
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
|
||||
static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
|
||||
SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level};
|
||||
int32_t ret = 0;
|
||||
int32_t contLen = 0;
|
||||
|
@ -1427,7 +1613,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
|
|||
.code = 0,
|
||||
.msgType = TDMT_VND_FETCH_RSMA,
|
||||
.pCont = pBuf,
|
||||
.contLen = contLen,
|
||||
.contLen = contLen + sizeof(SMsgHead),
|
||||
};
|
||||
|
||||
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) {
|
||||
|
@ -1456,9 +1642,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
|
|||
SRSmaFetchMsg req = {0};
|
||||
SDecoder decoder = {0};
|
||||
void *pBuf = NULL;
|
||||
SRSmaInfo *pInfo = NULL;
|
||||
SRSmaInfoItem *pItem = NULL;
|
||||
|
||||
SRSmaStat *pRSmaStat = NULL;
|
||||
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
|
||||
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
|
||||
goto _err;
|
||||
|
@ -1472,37 +1656,265 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid);
|
||||
if (!pInfo) {
|
||||
if (terrno == TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_RSMA_EMPTY_INFO;
|
||||
}
|
||||
smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma),
|
||||
req.suid, req.level, terrstr());
|
||||
pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
|
||||
if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 0) {
|
||||
SArray *pSubmitArr = NULL;
|
||||
if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pItem = RSMA_INFO_ITEM(pInfo, req.level - 1);
|
||||
|
||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, req.level - 1);
|
||||
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
|
||||
goto _err;
|
||||
tdRSmaConsumeAndFetch(pSma, req.suid, req.level, pSubmitArr);
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
taosArrayDestroy(pSubmitArr);
|
||||
} else {
|
||||
int8_t level = req.level;
|
||||
int8_t *val = taosHashGet(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid));
|
||||
if (val) {
|
||||
level |= (*val);
|
||||
}
|
||||
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, STREAM_INPUT__DATA_BLOCK) < 0) {
|
||||
goto _err;
|
||||
ASSERT(level >= 1 && level <= 3);
|
||||
taosHashPut(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid), &level, sizeof(level));
|
||||
}
|
||||
|
||||
tdCleanupStreamInputDataBlock(taskInfo);
|
||||
|
||||
tdReleaseRSmaInfo(pSma, pInfo);
|
||||
tDecoderClear(&decoder);
|
||||
smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid,
|
||||
req.level);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
tdReleaseRSmaInfo(pSma, pInfo);
|
||||
tDecoderClear(&decoder);
|
||||
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static void tdFreeRSmaSubmitItems(SArray *pItems) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
|
||||
taosFreeQitem(*(void **)taosArrayGet(pItems, i));
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr) {
|
||||
SRSmaInfo *pInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
|
||||
if (!pInfo) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// step 1: consume submit req
|
||||
int64_t qMemSize = 0;
|
||||
if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) {
|
||||
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
|
||||
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
atomic_fetch_sub_64(&pRSmaStat->qBufSize, qMemSize);
|
||||
|
||||
taosArrayClear(pSubmitArr);
|
||||
|
||||
while (1) {
|
||||
void *msg = NULL;
|
||||
taosGetQitem(pInfo->qall, (void **)&msg);
|
||||
if (msg) {
|
||||
if (taosArrayPush(pSubmitArr, &msg) < 0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t size = taosArrayGetSize(pSubmitArr);
|
||||
if (size > 0) {
|
||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, RSMA_EXEC_TIMEOUT, i) <
|
||||
0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
}
|
||||
}
|
||||
|
||||
// step 2: fetch rsma result
|
||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (level & i) {
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
|
||||
if (!taskInfo) {
|
||||
continue;
|
||||
}
|
||||
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
|
||||
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, suid) < 0) {
|
||||
tdCleanupStreamInputDataBlock(taskInfo);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tdCleanupStreamInputDataBlock(taskInfo);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
tdReleaseRSmaInfo(pSma, pInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
tdReleaseRSmaInfo(pSma, pInfo);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param pSma
|
||||
* @param type
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
SHashObj *infoHash = NULL;
|
||||
SArray *pSubmitQArr = NULL;
|
||||
SArray *pSubmitArr = NULL;
|
||||
bool isFetchAll = false;
|
||||
|
||||
if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
|
||||
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (type == RSMA_EXEC_OVERFLOW) {
|
||||
taosRLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZE) {
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
}
|
||||
|
||||
if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// step 1: rsma exec - consume data in buffer queue for all suids
|
||||
SRSmaExecQItem qItem = {0};
|
||||
void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock
|
||||
if (type == RSMA_EXEC_OVERFLOW) {
|
||||
while (pIter) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
|
||||
if (taosQueueItemSize(pInfo->queue)) {
|
||||
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
|
||||
qItem.qall = &pInfo->qall;
|
||||
qItem.pRSmaInfo = pIter;
|
||||
taosArrayPush(pSubmitQArr, &qItem);
|
||||
}
|
||||
ASSERT(taosQueueItemSize(pInfo->queue) == 0);
|
||||
pIter = taosHashIterate(infoHash, pIter);
|
||||
}
|
||||
} else if (type == RSMA_EXEC_COMMIT) {
|
||||
while (pIter) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
|
||||
if (taosQueueItemSize(pInfo->iQueue)) {
|
||||
taosReadAllQitems(pInfo->iQueue, pInfo->iQall);
|
||||
qItem.qall = &pInfo->iQall;
|
||||
qItem.pRSmaInfo = pIter;
|
||||
taosArrayPush(pSubmitQArr, &qItem);
|
||||
}
|
||||
ASSERT(taosQueueItemSize(pInfo->iQueue) == 0);
|
||||
pIter = taosHashIterate(infoHash, pIter);
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
atomic_store_64(&pRSmaStat->qBufSize, 0);
|
||||
|
||||
int32_t qSize = taosArrayGetSize(pSubmitQArr);
|
||||
for (int32_t i = 0; i < qSize; ++i) {
|
||||
SRSmaExecQItem *pItem = taosArrayGet(pSubmitQArr, i);
|
||||
while (1) {
|
||||
void *msg = NULL;
|
||||
taosGetQitem(*(STaosQall **)pItem->qall, (void **)&msg);
|
||||
if (msg) {
|
||||
if (taosArrayPush(pSubmitArr, &msg) < 0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t size = taosArrayGetSize(pSubmitArr);
|
||||
if (size > 0) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo;
|
||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
taosArrayClear(pSubmitArr);
|
||||
}
|
||||
}
|
||||
|
||||
// step 2: rsma fetch - consume data in buffer queue for suids triggered by timer
|
||||
if (taosHashGetSize(RSMA_FETCH_HASH(pRSmaStat)) <= 0) {
|
||||
goto _end;
|
||||
}
|
||||
pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), NULL);
|
||||
if (pIter) {
|
||||
tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr);
|
||||
while ((pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), pIter))) {
|
||||
tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
taosArrayDestroy(pSubmitArr);
|
||||
taosArrayDestroy(pSubmitQArr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
taosArrayDestroy(pSubmitArr);
|
||||
taosArrayDestroy(pSubmitQArr);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief exec rsma level 1data, fetch result of level 2/3 and submit
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t smaProcessExec(SSma *pSma, void *pMsg) {
|
||||
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
|
||||
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
|
||||
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
|
||||
goto _err;
|
||||
}
|
||||
smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(),
|
||||
terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
|
|
@ -139,7 +139,6 @@ static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppBuf)
|
|||
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size);
|
||||
|
||||
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf);
|
||||
pHdr->type = SNAP_DATA_QTASK;
|
||||
pHdr->size = size;
|
||||
|
@ -279,7 +278,8 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit
|
|||
TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (!qTaskF) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName, tstrerror(code));
|
||||
smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName,
|
||||
tstrerror(code));
|
||||
goto _err;
|
||||
}
|
||||
qWriter->pWriteH = qTaskF;
|
||||
|
@ -309,7 +309,7 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
|
|||
if (rollback) {
|
||||
// TODO: rsma1/rsma2
|
||||
// qtaskinfo
|
||||
if(pWriter->pQTaskFWriter) {
|
||||
if (pWriter->pQTaskFWriter) {
|
||||
taosRemoveFile(pWriter->pQTaskFWriter->fname);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -175,7 +175,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
}
|
||||
|
||||
tdRefSmaStat(pSma, pStat);
|
||||
pTsmaStat = SMA_TSMA_STAT(pStat);
|
||||
pTsmaStat = SMA_STAT_TSMA(pStat);
|
||||
|
||||
if (!pTsmaStat->pTSma) {
|
||||
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
|
||||
|
|
|
@ -350,49 +350,45 @@ _err:
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief pTSchema is shared
|
||||
* @brief Clone qTaskInfo of SRSmaInfo
|
||||
*
|
||||
* @param pSma
|
||||
* @param pDest
|
||||
* @param pSrc
|
||||
* @param pInfo
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
||||
SRSmaParam *param = NULL;
|
||||
if (!pSrc) {
|
||||
*pDest = NULL;
|
||||
if (!pInfo) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, SMA_META(pSma), 0);
|
||||
smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid);
|
||||
if (metaGetTableEntryByUid(&mr, pSrc->suid) < 0) {
|
||||
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid,
|
||||
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
|
||||
if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
|
||||
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
|
||||
terrstr());
|
||||
goto _err;
|
||||
}
|
||||
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
|
||||
ASSERT(mr.me.uid == pSrc->suid);
|
||||
ASSERT(mr.me.uid == pInfo->suid);
|
||||
if (TABLE_IS_ROLLUP(mr.me.flags)) {
|
||||
param = &mr.me.stbEntry.rsmaParam;
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) {
|
||||
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid);
|
||||
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
|
||||
} else {
|
||||
terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
|
||||
*pDest = pSrc; // pointer copy
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
*pDest = NULL;
|
||||
metaReaderClear(&mr);
|
||||
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr());
|
||||
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
|
@ -569,8 +569,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
|
|||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
}
|
||||
|
@ -581,8 +579,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
pTask->outputQueue = streamQueueOpen();
|
||||
|
||||
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
|
||||
code = -1;
|
||||
goto FAIL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
|
@ -627,14 +624,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
|||
|
||||
streamSetupTrigger(pTask);
|
||||
|
||||
tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
|
||||
tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
|
||||
pTask->selfChildId);
|
||||
|
||||
FAIL:
|
||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||
// TODO free executor
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
|
|
|
@ -231,11 +231,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||
SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||
pTask->tbSink.stbFullName, &deleteReq);
|
||||
|
||||
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
|
||||
|
||||
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
|
||||
int32_t code;
|
||||
int32_t len;
|
||||
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
||||
|
@ -244,21 +245,21 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
ASSERT(0);
|
||||
}
|
||||
SEncoder encoder;
|
||||
void* buf = rpcMallocCont(len + sizeof(SMsgHead));
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
||||
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
((SMsgHead*)buf)->vgId = pVnode->config.vgId;
|
||||
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
|
||||
|
||||
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_BATCH_DEL,
|
||||
.pCont = buf,
|
||||
.pCont = serializedDeleteReq,
|
||||
.contLen = len + sizeof(SMsgHead),
|
||||
};
|
||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||
rpcFreeCont(serializedDeleteReq);
|
||||
tqDebug("failed to put into write-queue since %s", terrstr());
|
||||
}
|
||||
}
|
||||
|
@ -268,11 +269,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
// build write msg
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_SUBMIT,
|
||||
.pCont = pReq,
|
||||
.contLen = ntohl(pReq->length),
|
||||
.pCont = submitReq,
|
||||
.contLen = ntohl(submitReq->length),
|
||||
};
|
||||
|
||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||
rpcFreeCont(submitReq);
|
||||
tqDebug("failed to put into write-queue since %s", terrstr());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2585,32 +2585,6 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
|
|||
|
||||
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
|
||||
|
||||
/**
|
||||
* @brief Get all suids since suid
|
||||
*
|
||||
* @param pMeta
|
||||
* @param suid return all suids in one vnode if suid is 0
|
||||
* @param list
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
|
||||
SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
|
||||
if (!pCur) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
tb_uid_t id = metaStbCursorNext(pCur);
|
||||
if (id == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
taosArrayPush(list, &id);
|
||||
}
|
||||
|
||||
metaCloseStbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// ====================================== EXPOSED APIs ======================================
|
||||
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
|
||||
|
|
|
@ -78,7 +78,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) {
|
|||
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
||||
SVBufPoolNode *pNode;
|
||||
void *p;
|
||||
|
||||
taosThreadSpinLock(&pPool->lock);
|
||||
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
|
||||
// allocate from the anchor node
|
||||
p = pPool->ptr;
|
||||
|
@ -89,6 +89,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
|||
pNode = taosMemoryMalloc(sizeof(*pNode) + size);
|
||||
if (pNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosThreadSpinUnlock(&pPool->lock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -101,7 +102,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
|
|||
|
||||
pPool->size = pPool->size + sizeof(*pNode) + size;
|
||||
}
|
||||
|
||||
taosThreadSpinUnlock(&pPool->lock);
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -129,6 +130,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (taosThreadSpinInit(&pPool->lock, 0) != 0) {
|
||||
taosMemoryFree(pPool);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pPool->next = NULL;
|
||||
pPool->pVnode = pVnode;
|
||||
pPool->nRef = 0;
|
||||
|
@ -145,6 +152,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
|
|||
|
||||
static int vnodeBufPoolDestroy(SVBufPool *pPool) {
|
||||
vnodeBufPoolReset(pPool);
|
||||
taosThreadSpinDestroy(&pPool->lock);
|
||||
taosMemoryFree(pPool);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -220,6 +220,10 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
|
||||
pVnode->state.applied);
|
||||
|
||||
// preCommit
|
||||
// smaSyncPreCommit(pVnode->pSma);
|
||||
smaAsyncPreCommit(pVnode->pSma);
|
||||
|
||||
vnodeBufPoolUnRef(pVnode->inUse);
|
||||
pVnode->inUse = NULL;
|
||||
|
||||
|
@ -237,10 +241,6 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
}
|
||||
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
|
||||
|
||||
// preCommit
|
||||
// smaSyncPreCommit(pVnode->pSma);
|
||||
smaAsyncPreCommit(pVnode->pSma);
|
||||
|
||||
// commit each sub-system
|
||||
if (metaCommit(pVnode->pMeta) < 0) {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -424,6 +424,25 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeGetStbIdList(SVnode* pVnode, int64_t suid, SArray* list) {
|
||||
SMStbCursor* pCur = metaOpenStbCursor(pVnode->pMeta, suid);
|
||||
if (!pCur) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
tb_uid_t id = metaStbCursorNext(pCur);
|
||||
if (id == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
taosArrayPush(list, &id);
|
||||
}
|
||||
|
||||
metaCloseStbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
|
||||
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid);
|
||||
if (!pCur) {
|
||||
|
|
|
@ -303,6 +303,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_VND_FETCH_RSMA:
|
||||
return smaProcessFetch(pVnode->pSma, pMsg);
|
||||
case TDMT_VND_EXEC_RSMA:
|
||||
return smaProcessExec(pVnode->pSma, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in query queue", pMsg->msgType);
|
||||
return TSDB_CODE_VND_APP_ERROR;
|
||||
|
@ -530,7 +532,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
|
|||
}
|
||||
|
||||
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
|
||||
tdUpdateTbUidList(pVnode->pSma, pStore);
|
||||
if (tdUpdateTbUidList(pVnode->pSma, pStore) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
tdUidStoreFree(pStore);
|
||||
|
||||
// prepare rsp
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
#define TDENGINE_TSIMPLEHASH_H
|
||||
|
||||
#include "tarray.h"
|
||||
#include "tlockfree.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -27,6 +26,10 @@ typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
|
|||
typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len);
|
||||
typedef void (*_hash_free_fn_t)(void *);
|
||||
|
||||
/**
|
||||
* @brief single thread hash
|
||||
*
|
||||
*/
|
||||
typedef struct SSHashObj SSHashObj;
|
||||
|
||||
/**
|
||||
|
@ -36,7 +39,7 @@ typedef struct SSHashObj SSHashObj;
|
|||
* @param fn hash function to generate the hash value
|
||||
* @return
|
||||
*/
|
||||
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen);
|
||||
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn);
|
||||
|
||||
/**
|
||||
* return the size of hash table
|
||||
|
@ -48,22 +51,26 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj);
|
|||
int32_t tSimpleHashPrint(const SSHashObj *pHashObj);
|
||||
|
||||
/**
|
||||
* put element into hash table, if the element with the same key exists, update it
|
||||
* @brief put element into hash table, if the element with the same key exists, update it
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param key
|
||||
* @param keyLen
|
||||
* @param data
|
||||
* @return
|
||||
* @param dataLen
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data);
|
||||
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen);
|
||||
|
||||
/**
|
||||
* return the payload data with the specified key
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param key
|
||||
* @param keyLen
|
||||
* @return
|
||||
*/
|
||||
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key);
|
||||
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen);
|
||||
|
||||
/**
|
||||
* remove item with the specified key
|
||||
|
@ -71,7 +78,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key);
|
|||
* @param key
|
||||
* @param keyLen
|
||||
*/
|
||||
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key);
|
||||
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen);
|
||||
|
||||
/**
|
||||
* Clear the hash table.
|
||||
|
@ -98,7 +105,7 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
|
|||
* @param keyLen
|
||||
* @return
|
||||
*/
|
||||
void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen);
|
||||
void *tSimpleHashGetKey(void *data, size_t* keyLen);
|
||||
|
||||
/**
|
||||
* Create the hash table iterator
|
||||
|
@ -109,6 +116,17 @@ void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen);
|
|||
*/
|
||||
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter);
|
||||
|
||||
/**
|
||||
* Create the hash table iterator
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param data
|
||||
* @param key
|
||||
* @param iter
|
||||
* @return void*
|
||||
*/
|
||||
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -55,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
taosArrayClear(pInfo->pBlockLists);
|
||||
|
||||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
ASSERT(numOfBlocks > 1);
|
||||
// ASSERT(numOfBlocks > 1);
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
|
||||
taosArrayPush(pInfo->pBlockLists, &pReq);
|
||||
|
|
|
@ -2154,7 +2154,7 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
|
|||
|
||||
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) {
|
||||
int32_t rows = pResBlock->info.rows;
|
||||
|
||||
blockDataEnsureCapacity(pResBlock, rows + 1);
|
||||
// todo set the correct primary timestamp column
|
||||
|
||||
// output the result
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
*/
|
||||
|
||||
#include "tsimplehash.h"
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
|
||||
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
|
||||
|
@ -31,10 +31,14 @@
|
|||
taosMemoryFreeClear(_n); \
|
||||
} while (0);
|
||||
|
||||
#pragma pack(push, 4)
|
||||
typedef struct SHNode {
|
||||
struct SHNode *next;
|
||||
uint32_t keyLen : 20;
|
||||
uint32_t dataLen : 12;
|
||||
char data[];
|
||||
} SHNode;
|
||||
#pragma pack(pop)
|
||||
|
||||
struct SSHashObj {
|
||||
SHNode **hashList;
|
||||
|
@ -42,8 +46,6 @@ struct SSHashObj {
|
|||
int64_t size; // number of elements in hash table
|
||||
_hash_fn_t hashFp; // hash function
|
||||
_equal_fn_t equalFp; // equal function
|
||||
int32_t keyLen;
|
||||
int32_t dataLen;
|
||||
};
|
||||
|
||||
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
|
||||
|
@ -54,7 +56,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
|
|||
return i;
|
||||
}
|
||||
|
||||
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen) {
|
||||
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
|
||||
ASSERT(fn != NULL);
|
||||
|
||||
if (capacity == 0) {
|
||||
|
@ -74,8 +76,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t
|
|||
pHashObj->hashFp = fn;
|
||||
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
|
||||
|
||||
pHashObj->keyLen = keyLen;
|
||||
pHashObj->dataLen = dataLen;
|
||||
|
||||
pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
|
||||
if (!pHashObj->hashList) {
|
||||
|
@ -93,40 +93,41 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
|
|||
return (int32_t)atomic_load_64((int64_t *)&pHashObj->size);
|
||||
}
|
||||
|
||||
static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
|
||||
SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dsize);
|
||||
static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *data, size_t dataLen, uint32_t hashVal) {
|
||||
SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dataLen);
|
||||
if (!pNewNode) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pNewNode->keyLen = keyLen;
|
||||
pNewNode->dataLen = dataLen;
|
||||
pNewNode->next = NULL;
|
||||
memcpy(GET_SHASH_NODE_DATA(pNewNode), pData, dsize);
|
||||
memcpy(GET_SHASH_NODE_KEY(pNewNode, dsize), key, keyLen);
|
||||
memcpy(GET_SHASH_NODE_DATA(pNewNode), data, dataLen);
|
||||
memcpy(GET_SHASH_NODE_KEY(pNewNode, dataLen), key, keyLen);
|
||||
return pNewNode;
|
||||
}
|
||||
|
||||
static void taosHashTableResize(SSHashObj *pHashObj) {
|
||||
static void tSimpleHashTableResize(SSHashObj *pHashObj) {
|
||||
if (!SHASH_NEED_RESIZE(pHashObj)) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
|
||||
if (newCapacity > HASH_MAX_CAPACITY) {
|
||||
// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached",
|
||||
// pHashObj->capacity, HASH_MAX_CAPACITY);
|
||||
uDebug("current capacity:%zu, maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached",
|
||||
pHashObj->capacity, HASH_MAX_CAPACITY);
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
|
||||
if (!pNewEntryList) {
|
||||
// qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
|
||||
uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
|
||||
return;
|
||||
}
|
||||
|
||||
size_t inc = newCapacity - pHashObj->capacity;
|
||||
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc);
|
||||
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc * sizeof(void *));
|
||||
|
||||
pHashObj->hashList = pNewEntryList;
|
||||
pHashObj->capacity = newCapacity;
|
||||
|
@ -141,8 +142,8 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
|
|||
SHNode *pPrev = NULL;
|
||||
|
||||
while (pNode != NULL) {
|
||||
void *key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen);
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
|
||||
void *key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pNode->keyLen);
|
||||
|
||||
int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
pNext = pNode->next;
|
||||
|
@ -170,23 +171,23 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
|
|||
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
|
||||
}
|
||||
|
||||
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
|
||||
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen) {
|
||||
if (!pHashObj || !key) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||
|
||||
// need the resize process, write lock applied
|
||||
if (SHASH_NEED_RESIZE(pHashObj)) {
|
||||
taosHashTableResize(pHashObj);
|
||||
tSimpleHashTableResize(pHashObj);
|
||||
}
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
|
||||
SHNode *pNode = pHashObj->hashList[slot];
|
||||
if (!pNode) {
|
||||
SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal);
|
||||
SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal);
|
||||
if (!pNewNode) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -197,14 +198,14 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
|
|||
}
|
||||
|
||||
while (pNode) {
|
||||
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) {
|
||||
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
|
||||
break;
|
||||
}
|
||||
pNode = pNode->next;
|
||||
}
|
||||
|
||||
if (!pNode) {
|
||||
SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal);
|
||||
SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal);
|
||||
if (!pNewNode) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -212,16 +213,16 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
|
|||
pHashObj->hashList[slot] = pNewNode;
|
||||
atomic_add_fetch_64(&pHashObj->size, 1);
|
||||
} else { // update data
|
||||
memcpy(GET_SHASH_NODE_DATA(pNode), data, pHashObj->dataLen);
|
||||
memcpy(GET_SHASH_NODE_DATA(pNode), data, dataLen);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, int32_t index) {
|
||||
static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) {
|
||||
SHNode *pNode = pHashObj->hashList[index];
|
||||
while (pNode) {
|
||||
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) {
|
||||
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -233,12 +234,12 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void
|
|||
|
||||
static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) { return tSimpleHashGetSize(pHashObj) == 0; }
|
||||
|
||||
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
|
||||
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||
if (!pHashObj || taosHashTableEmpty(pHashObj) || !key) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
SHNode *pNode = pHashObj->hashList[slot];
|
||||
|
@ -247,7 +248,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
|
|||
}
|
||||
|
||||
char *data = NULL;
|
||||
pNode = doSearchInEntryList(pHashObj, key, slot);
|
||||
pNode = doSearchInEntryList(pHashObj, key, keyLen, slot);
|
||||
if (pNode != NULL) {
|
||||
data = GET_SHASH_NODE_DATA(pNode);
|
||||
}
|
||||
|
@ -255,19 +256,19 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
|
|||
return data;
|
||||
}
|
||||
|
||||
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key) {
|
||||
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||
if (!pHashObj || !key) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen);
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
|
||||
SHNode *pNode = pHashObj->hashList[slot];
|
||||
SHNode *pPrev = NULL;
|
||||
while (pNode) {
|
||||
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) {
|
||||
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
|
||||
if (!pPrev) {
|
||||
pHashObj->hashList[slot] = pNode->next;
|
||||
} else {
|
||||
|
@ -312,6 +313,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) {
|
|||
|
||||
tSimpleHashClear(pHashObj);
|
||||
taosMemoryFreeClear(pHashObj->hashList);
|
||||
taosMemoryFree(pHashObj);
|
||||
}
|
||||
|
||||
size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
|
||||
|
@ -322,23 +324,13 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
|
|||
return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj);
|
||||
}
|
||||
|
||||
void *tSimpleHashGetKey(const SSHashObj *pHashObj, void *data, size_t *keyLen) {
|
||||
#if 0
|
||||
int32_t offset = offsetof(SHNode, data);
|
||||
SHNode *node = ((SHNode *)(char *)data - offset);
|
||||
void *tSimpleHashGetKey(void *data, size_t *keyLen) {
|
||||
SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data));
|
||||
if (keyLen) {
|
||||
*keyLen = pHashObj->keyLen;
|
||||
*keyLen = node->keyLen;
|
||||
}
|
||||
|
||||
return POINTER_SHIFT(data, pHashObj->dataLen);
|
||||
|
||||
return GET_SHASH_NODE_KEY(node, pHashObj->dataLen);
|
||||
#endif
|
||||
if (keyLen) {
|
||||
*keyLen = pHashObj->keyLen;
|
||||
}
|
||||
|
||||
return POINTER_SHIFT(data, pHashObj->dataLen);
|
||||
return POINTER_SHIFT(data, node->dataLen);
|
||||
}
|
||||
|
||||
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
|
||||
|
@ -378,3 +370,50 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
|
|||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter) {
|
||||
if (!pHashObj) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SHNode *pNode = NULL;
|
||||
|
||||
if (!data) {
|
||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||
pNode = pHashObj->hashList[i];
|
||||
if (!pNode) {
|
||||
continue;
|
||||
}
|
||||
*iter = i;
|
||||
if (key) {
|
||||
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
|
||||
}
|
||||
return GET_SHASH_NODE_DATA(pNode);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pNode = (SHNode *)((char *)data - offsetof(SHNode, data));
|
||||
|
||||
if (pNode->next) {
|
||||
if (key) {
|
||||
*key = GET_SHASH_NODE_KEY(pNode->next, pNode->next->dataLen);
|
||||
}
|
||||
return GET_SHASH_NODE_DATA(pNode->next);
|
||||
}
|
||||
|
||||
++(*iter);
|
||||
for (int32_t i = *iter; i < pHashObj->capacity; ++i) {
|
||||
pNode = pHashObj->hashList[i];
|
||||
if (!pNode) {
|
||||
continue;
|
||||
}
|
||||
*iter = i;
|
||||
if (key) {
|
||||
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
|
||||
}
|
||||
return GET_SHASH_NODE_DATA(pNode);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
|
@ -32,31 +32,33 @@
|
|||
|
||||
TEST(testCase, tSimpleHashTest) {
|
||||
SSHashObj *pHashObj =
|
||||
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), sizeof(int64_t), sizeof(int64_t));
|
||||
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
|
||||
assert(pHashObj != nullptr);
|
||||
|
||||
ASSERT_EQ(0, tSimpleHashGetSize(pHashObj));
|
||||
|
||||
size_t keyLen = sizeof(int64_t);
|
||||
size_t dataLen = sizeof(int64_t);
|
||||
|
||||
int64_t originKeySum = 0;
|
||||
for (int64_t i = 1; i <= 100; ++i) {
|
||||
originKeySum += i;
|
||||
tSimpleHashPut(pHashObj, (const void *)&i, (const void *)&i);
|
||||
tSimpleHashPut(pHashObj, (const void *)&i, keyLen, (const void *)&i, dataLen);
|
||||
ASSERT_EQ(i, tSimpleHashGetSize(pHashObj));
|
||||
}
|
||||
|
||||
for (int64_t i = 1; i <= 100; ++i) {
|
||||
void *data = tSimpleHashGet(pHashObj, (const void *)&i);
|
||||
void *data = tSimpleHashGet(pHashObj, (const void *)&i, keyLen);
|
||||
ASSERT_EQ(i, *(int64_t *)data);
|
||||
}
|
||||
|
||||
|
||||
void *data = NULL;
|
||||
int32_t iter = 0;
|
||||
int64_t keySum = 0;
|
||||
int64_t dataSum = 0;
|
||||
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
|
||||
void *key = tSimpleHashGetKey(pHashObj, data, NULL);
|
||||
void *key = tSimpleHashGetKey(data, NULL);
|
||||
keySum += *(int64_t *)key;
|
||||
dataSum += *(int64_t *)data;
|
||||
}
|
||||
|
@ -65,7 +67,7 @@ TEST(testCase, tSimpleHashTest) {
|
|||
ASSERT_EQ(keySum, originKeySum);
|
||||
|
||||
for (int64_t i = 1; i <= 100; ++i) {
|
||||
tSimpleHashRemove(pHashObj, (const void *)&i);
|
||||
tSimpleHashRemove(pHashObj, (const void *)&i, keyLen);
|
||||
ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj));
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ typedef struct {
|
|||
|
||||
static SStreamGlobalEnv streamEnv;
|
||||
|
||||
int32_t streamExec(SStreamTask* pTask);
|
||||
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
|
||||
|
||||
int32_t streamDispatch(SStreamTask* pTask);
|
||||
|
|
|
@ -185,7 +185,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
tFreeStreamDispatchReq(pReq);
|
||||
|
||||
if (exec) {
|
||||
streamTryExec(pTask);
|
||||
if (streamTryExec(pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
streamDispatch(pTask);
|
||||
|
@ -221,7 +223,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
|||
}
|
||||
|
||||
int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||
streamTryExec(pTask);
|
||||
if (streamTryExec(pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
streamDispatch(pTask);
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "executor.h"
|
||||
#include "tstream.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
|
||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||
|
@ -99,16 +100,19 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
||||
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
|
||||
taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t));
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
goto FAIL;
|
||||
}
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
if (pTask) taosMemoryFree(pTask);
|
||||
if (pTask) tFreeSStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -158,11 +162,28 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
|||
SStreamTask* pTask = *ppTask;
|
||||
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
||||
}
|
||||
|
||||
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
|
||||
/*return -1;*/
|
||||
}
|
||||
|
||||
if (pTask->triggerParam != 0) {
|
||||
taosTmrStop(pTask->timer);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
int8_t schedStatus =
|
||||
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING);
|
||||
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
||||
tFreeSStreamTask(pTask);
|
||||
break;
|
||||
} else if (schedStatus == TASK_SCHED_STATUS__DROPPING) {
|
||||
break;
|
||||
}
|
||||
taosMsleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tstream.h"
|
||||
#include "streamInc.h"
|
||||
|
||||
SStreamQueue* streamQueueOpen() {
|
||||
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
|
||||
|
@ -36,9 +36,12 @@ void streamQueueClose(SStreamQueue* queue) {
|
|||
while (1) {
|
||||
void* qItem = streamQueueNextItem(queue);
|
||||
if (qItem) {
|
||||
taosFreeQitem(qItem);
|
||||
streamFreeQitem(qItem);
|
||||
} else {
|
||||
return;
|
||||
break;
|
||||
}
|
||||
}
|
||||
taosFreeQall(queue->qall);
|
||||
taosCloseQueue(queue->queue);
|
||||
taosMemoryFree(queue);
|
||||
}
|
||||
|
|
|
@ -152,9 +152,17 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
void tFreeSStreamTask(SStreamTask* pTask) {
|
||||
streamQueueClose(pTask->inputQueue);
|
||||
streamQueueClose(pTask->outputQueue);
|
||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
||||
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
|
||||
taosArrayDestroy(pTask->childEpInfo);
|
||||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
||||
taosMemoryFree(pTask->tbSink.pTSchema);
|
||||
}
|
||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
}
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
|
|
|
@ -237,7 +237,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode);
|
|||
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
|
||||
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
|
||||
|
||||
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
|
||||
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode);
|
||||
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
|
||||
|
||||
|
|
|
@ -148,12 +148,35 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
|
|||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
|
||||
int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
|
||||
SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
|
||||
|
||||
int64_t recvTimeDiff = TABS(peerRecvTime - timeNow);
|
||||
int64_t startTimeDiff = TABS(peerStartTime - pSyncNode->startTime);
|
||||
int64_t logDiff = TABS(peerMatchIndex - syncNodeGetLastIndex(pSyncNode));
|
||||
|
||||
/*
|
||||
int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow);
|
||||
int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime);
|
||||
int64_t logDiff = syncNodeAbs64(peerMatchIndex, syncNodeGetLastIndex(pSyncNode));
|
||||
*/
|
||||
|
||||
int32_t addQuorum = 0;
|
||||
|
||||
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
|
||||
if (startTimeDiff < SYNC_MAX_START_TIME_RANGE_MS) {
|
||||
addQuorum = 1;
|
||||
} else {
|
||||
if (logDiff < SYNC_ADD_QUORUM_COUNT) {
|
||||
addQuorum = 1;
|
||||
} else {
|
||||
addQuorum = 0;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
addQuorum = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
|
||||
addQuorum = 1;
|
||||
} else {
|
||||
|
@ -163,6 +186,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
|
|||
if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) {
|
||||
addQuorum = 0;
|
||||
}
|
||||
*/
|
||||
|
||||
quorum += addQuorum;
|
||||
}
|
||||
|
|
|
@ -2284,7 +2284,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
|
|||
|
||||
// return max(logLastIndex, snapshotLastIndex)
|
||||
// if no snapshot and log, return -1
|
||||
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
|
||||
SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
|
||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
|
||||
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||
|
@ -2773,10 +2773,26 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pEntry->term < ths->pRaftStore->currentTerm) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "little term:%lu, can not do leader transfer", pEntry->term);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pEntry->index < syncNodeGetLastIndex(ths)) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "little index:%ld, can not do leader transfer", pEntry->index);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
if (ths->vgId > 1) {
|
||||
syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
|
||||
do {
|
||||
char logBuf[128];
|
||||
|
|
|
@ -293,6 +293,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first")
|
||||
|
||||
// mnode-sma
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
||||
|
@ -616,6 +617,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema")
|
||||
|
||||
//index
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
||||
|
|
Loading…
Reference in New Issue