enh: rsma batch process
This commit is contained in:
parent
8c75274e13
commit
dab6c81769
|
@ -2660,7 +2660,7 @@ typedef struct {
|
|||
} SVgEpSet;
|
||||
|
||||
typedef struct {
|
||||
int32_t padding;
|
||||
// padding
|
||||
} SRSmaExecMsg;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -201,6 +201,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
||||
|
|
|
@ -614,6 +614,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
|
||||
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
|
||||
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
|
||||
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
|
||||
|
||||
//index
|
||||
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
||||
|
|
|
@ -338,6 +338,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -57,9 +57,10 @@ typedef struct {
|
|||
void *tmrHandle; // shared by all fetch tasks
|
||||
} SSmaMgmt;
|
||||
|
||||
#define SMA_ENV_LOCK(env) (&(env)->lock)
|
||||
#define SMA_ENV_TYPE(env) ((env)->type)
|
||||
#define SMA_ENV_STAT(env) ((env)->pStat)
|
||||
#define SMA_ENV_LOCK(env) (&(env)->lock)
|
||||
#define SMA_ENV_TYPE(env) ((env)->type)
|
||||
#define SMA_ENV_STAT(env) ((env)->pStat)
|
||||
#define SMA_RSMA_STAT(sma) ((SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)(sma)->pRSmaEnv))
|
||||
|
||||
struct STSmaStat {
|
||||
int8_t state; // ETsdbSmaStat
|
||||
|
@ -86,17 +87,19 @@ struct SQTaskFWriter {
|
|||
};
|
||||
|
||||
struct SRSmaStat {
|
||||
SSma *pSma;
|
||||
int64_t commitAppliedVer; // vnode applied version for async commit
|
||||
int64_t refId; // shared by fetch tasks
|
||||
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
|
||||
int8_t triggerStat; // shared by fetch tasks
|
||||
int8_t commitStat; // 0 not in committing, 1 in committing
|
||||
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
|
||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||
SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash
|
||||
SSma *pSma;
|
||||
int64_t commitAppliedVer; // vnode applied version for async commit
|
||||
int64_t refId; // shared by fetch tasks
|
||||
volatile int64_t qBufSize; // queue buffer size
|
||||
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
|
||||
int8_t triggerStat; // shared by fetch tasks
|
||||
int8_t commitStat; // 0 not in committing, 1 in committing
|
||||
int8_t execStat; // 0 not in exec , 1 in exec
|
||||
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
|
||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||
};
|
||||
|
||||
|
||||
struct SSmaStat {
|
||||
union {
|
||||
STSmaStat tsmaStat; // time-range-wise sma
|
||||
|
@ -105,10 +108,9 @@ struct SSmaStat {
|
|||
T_REF_DECLARE()
|
||||
};
|
||||
|
||||
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
|
||||
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
|
||||
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
|
||||
#define SMA_STAT_RSMA(s) (&(s)->rsmaStat)
|
||||
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
|
||||
#define RSMA_IMU_INFO_HASH(r) ((r)->iRsmaInfoHash)
|
||||
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
|
||||
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
|
||||
#define RSMA_REF_ID(r) ((r)->refId)
|
||||
|
@ -122,23 +124,25 @@ struct SRSmaInfoItem {
|
|||
};
|
||||
|
||||
struct SRSmaInfo {
|
||||
STSchema *pTSchema;
|
||||
STaosQueue *queue; // buffer queue of SubmitReq
|
||||
STaosQall *qall;
|
||||
int64_t suid;
|
||||
int64_t refId; // refId of SRSmaStat
|
||||
int8_t delFlag;
|
||||
STSchema *pTSchema;
|
||||
int64_t suid;
|
||||
int64_t refId; // refId of SRSmaStat
|
||||
int8_t delFlag;
|
||||
T_REF_DECLARE()
|
||||
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
||||
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
|
||||
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable
|
||||
STaosQueue *queue; // buffer queue of SubmitReq
|
||||
STaosQall *qall; // buffer qall of SubmitReq
|
||||
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t
|
||||
STaosQueue *iQueue; // immutable buffer queue of SubmitReq
|
||||
STaosQall *iQall; // immutable buffer qall of SubmitReq
|
||||
};
|
||||
|
||||
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
|
||||
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
|
||||
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
|
||||
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
|
||||
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
|
||||
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
|
||||
#define RSMA_INFO_ITEM(r, i) (&(r)->items[i])
|
||||
|
||||
enum {
|
||||
|
@ -230,12 +234,13 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
|
|||
|
||||
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
|
||||
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc);
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
|
||||
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
||||
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
|
||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
|
||||
int32_t tdRSmaProcessExecImpl(SSma *pSma);
|
||||
|
||||
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
||||
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
|
||||
|
|
|
@ -188,6 +188,7 @@ int32_t smaAsyncCommit(SSma* pSma);
|
|||
int32_t smaAsyncPostCommit(SSma* pSma);
|
||||
int32_t smaDoRetention(SSma* pSma, int64_t now);
|
||||
int32_t smaProcessFetch(SSma* pSma, void* pMsg);
|
||||
int32_t smaProcessExec(SSma* pSma, void* pMsg);
|
||||
|
||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||
|
|
|
@ -83,8 +83,7 @@ int32_t smaBegin(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
|
||||
int8_t rsmaTriggerStat =
|
||||
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
|
||||
|
@ -122,8 +121,8 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
|
||||
|
||||
// step 1: set rsma stat paused
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||
|
@ -289,8 +288,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(SMA_ENV_STAT(pSmaEnv));
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
|
||||
// cleanup outdated qtaskinfo files
|
||||
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
|
||||
|
@ -314,7 +312,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
|
||||
|
||||
// step 1: set rsma stat
|
||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||
|
@ -336,24 +334,30 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
}
|
||||
}
|
||||
|
||||
// step 3: swap rsmaInfoHash and iRsmaInfoHash
|
||||
// step 3: consume the SubmitReq in buffer
|
||||
if (tdRSmaProcessExecImpl(pSma) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// step 4: swap rsmaInfoHash and iRsmaInfoHash
|
||||
// lock
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
ASSERT(RSMA_INFO_HASH(pRSmaStat));
|
||||
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat));
|
||||
|
||||
RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat);
|
||||
RSMA_INFO_HASH(pRSmaStat) =
|
||||
taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
||||
|
||||
if (!RSMA_INFO_HASH(pRSmaStat)) {
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
while (pIter) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
|
||||
TSWAP(pInfo->iQall, pInfo->qall);
|
||||
TSWAP(pInfo->iQueue, pInfo->queue);
|
||||
TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
|
||||
TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
|
||||
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
|
||||
}
|
||||
|
||||
atomic_store_64(&pRSmaStat->qBufSize, 0);
|
||||
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
|
@ -375,11 +379,9 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
|
||||
// perform persist task for qTaskInfo
|
||||
tdRSmaPersistExecImpl(pRSmaStat, RSMA_IMU_INFO_HASH(pRSmaStat));
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -396,65 +398,68 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
SArray *rsmaDeleted = NULL;
|
||||
|
||||
// step 1: merge rsmaInfoHash and iRsmaInfoHash
|
||||
// step 1: merge qTaskInfo and iQTaskInfo
|
||||
// lock
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
#if 0
|
||||
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
|
||||
// just switch the hash pointer if rsmaInfoHash is empty
|
||||
if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) {
|
||||
SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat);
|
||||
RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat);
|
||||
RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash;
|
||||
}
|
||||
} else {
|
||||
#endif
|
||||
#if 1
|
||||
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
|
||||
|
||||
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
||||
while (pIter) {
|
||||
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
|
||||
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
|
||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
|
||||
if (refVal == 0) {
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
|
||||
"table:%" PRIi64,
|
||||
SMA_VID(pSma), *pSuid);
|
||||
} else {
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
|
||||
"table:%" PRIi64,
|
||||
SMA_VID(pSma), refVal, *pSuid);
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
|
||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
|
||||
if (refVal == 0) {
|
||||
if(!rsmaDeleted) {
|
||||
if((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))){
|
||||
taosArrayPush(rsmaDeleted, pSuid);
|
||||
}
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
|
||||
continue;
|
||||
} else {
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
|
||||
"table:%" PRIi64,
|
||||
SMA_VID(pSma), refVal, *pSuid);
|
||||
}
|
||||
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
|
||||
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
|
||||
*pSuid);
|
||||
} else {
|
||||
// free the resources
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
|
||||
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
|
||||
*pSuid);
|
||||
|
||||
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
|
||||
continue;
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
|
||||
}
|
||||
#endif
|
||||
// }
|
||||
if (pRSmaInfo->taskInfo[0]) {
|
||||
if (pRSmaInfo->iTaskInfo[0]) {
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||
pRSmaInfo->iTaskInfo[0] = NULL;
|
||||
}
|
||||
} else {
|
||||
TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
|
||||
}
|
||||
|
||||
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat));
|
||||
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL;
|
||||
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
|
||||
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
|
||||
|
||||
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(rsmaDeleted) > 0) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
|
||||
tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i);
|
||||
void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
|
||||
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
|
||||
"table:%" PRIi64,
|
||||
SMA_VID(pSma), *pSuid);
|
||||
}
|
||||
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
|
||||
}
|
||||
// remove suid in files
|
||||
taosArrayDestroy(rsmaDeleted);
|
||||
}
|
||||
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
|
|
@ -315,9 +315,9 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
|||
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
|
||||
if (pSmaStat) {
|
||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
|
||||
tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
|
||||
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
|
||||
SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
|
||||
int32_t vid = SMA_VID(pRSmaStat->pSma);
|
||||
int64_t refId = RSMA_REF_ID(pRSmaStat);
|
||||
if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
#define RSMA_QTASKINFO_BUFSIZE 32768
|
||||
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
||||
#define RSMA_QTASKEXEC_BUFSIZ 1 // * 1048576 // 8 MB
|
||||
|
||||
SSmaMgmt smaMgmt = {
|
||||
.inited = 0,
|
||||
|
@ -27,17 +28,18 @@ SSmaMgmt smaMgmt = {
|
|||
#define TD_RSMAINFO_DEL_FILE "rsmainfo.del"
|
||||
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
|
||||
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
||||
typedef struct SRSmaExecQItem SRSmaExecQItem;
|
||||
|
||||
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
||||
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||
int8_t idx);
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid,
|
||||
int8_t level);
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||
tb_uid_t suid, int8_t level);
|
||||
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
|
||||
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
||||
int64_t suid, int8_t blkType);
|
||||
int64_t suid);
|
||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||
static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level);
|
||||
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
|
||||
|
@ -76,6 +78,11 @@ struct SRSmaQTaskInfoIter {
|
|||
int32_t nBufPos;
|
||||
};
|
||||
|
||||
struct SRSmaExecQItem {
|
||||
void *pRSmaInfo;
|
||||
void *qall;
|
||||
};
|
||||
|
||||
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) {
|
||||
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
|
||||
}
|
||||
|
@ -143,8 +150,12 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
|
|||
if (isDeepFree) {
|
||||
if (pInfo->queue) taosCloseQueue(pInfo->queue);
|
||||
if (pInfo->qall) taosFreeQall(pInfo->qall);
|
||||
if (pInfo->iQueue) taosCloseQueue(pInfo->iQueue);
|
||||
if (pInfo->iQall) taosFreeQall(pInfo->iQall);
|
||||
pInfo->queue = NULL;
|
||||
pInfo->qall = NULL;
|
||||
pInfo->iQueue = NULL;
|
||||
pInfo->iQall = NULL;
|
||||
}
|
||||
|
||||
taosMemoryFree(pInfo);
|
||||
|
@ -362,9 +373,18 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
if (!(pRSmaInfo->queue = taosOpenQueue())) {
|
||||
goto _err;
|
||||
}
|
||||
smaError("vgId:%d init bufSize:%" PRIi64 ", qMemSize:%" PRIi64, SMA_VID(pSma), atomic_load_64(&pStat->qBufSize),
|
||||
taosQueueMemorySize(pRSmaInfo->queue));
|
||||
|
||||
if (!(pRSmaInfo->qall = taosAllocateQall())) {
|
||||
goto _err;
|
||||
}
|
||||
if (!(pRSmaInfo->iQueue = taosOpenQueue())) {
|
||||
goto _err;
|
||||
}
|
||||
if (!(pRSmaInfo->iQall = taosAllocateQall())) {
|
||||
goto _err;
|
||||
}
|
||||
pRSmaInfo->suid = suid;
|
||||
pRSmaInfo->refId = RSMA_REF_ID(pStat);
|
||||
T_REF_INIT_VAL(pRSmaInfo, 1);
|
||||
|
@ -433,8 +453,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
|
||||
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid);
|
||||
|
||||
|
@ -619,7 +638,7 @@ _end:
|
|||
}
|
||||
|
||||
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
||||
int64_t suid, int8_t blkType) {
|
||||
int64_t suid) {
|
||||
SArray *pResList = taosArrayInit(1, POINTER_BYTES);
|
||||
if (pResList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -690,8 +709,50 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid,
|
||||
int8_t level) {
|
||||
/**
|
||||
* @brief Copy msg to rsmaQueueBuffer
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @param inputType
|
||||
* @param pInfo
|
||||
* @param suid
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo,
|
||||
tb_uid_t suid) {
|
||||
const SSubmitReq *pReq = (const SSubmitReq *)pMsg;
|
||||
|
||||
void *qItem = taosAllocateQitem(pReq->length, DEF_QITEM);
|
||||
if (!qItem) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
memcpy(qItem, pMsg, pReq->header.contLen);
|
||||
|
||||
taosWriteQitem(pInfo->queue, qItem);
|
||||
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
int64_t size = atomic_fetch_add_64(&pRSmaStat->qBufSize, taosQueueMemorySize(pInfo->queue));
|
||||
smaError("vgId:%d originSize:%" PRIi64 ", after push size is:%" PRIi64, SMA_VID(pSma), size,
|
||||
atomic_load_64(&pRSmaStat->qBufSize));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief sync mode
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @param msgSize
|
||||
* @param inputType
|
||||
* @param pInfo
|
||||
* @param suid
|
||||
* @param level
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||
tb_uid_t suid, int8_t level) {
|
||||
int32_t idx = level - 1;
|
||||
if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) {
|
||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
|
||||
|
@ -705,14 +766,13 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
|
|||
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
|
||||
RSMA_INFO_QTASK(pInfo, idx), suid);
|
||||
|
||||
if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT
|
||||
if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, msgSize, inputType) < 0) { // INPUT__DATA_SUBMIT
|
||||
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
|
||||
tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid,
|
||||
STREAM_INPUT__DATA_SUBMIT);
|
||||
tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid);
|
||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
|
||||
if (smaMgmt.tmrHandle) {
|
||||
|
@ -752,8 +812,15 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
|||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return NULL;
|
||||
}
|
||||
if (!pRSmaInfo->taskInfo[0]) {
|
||||
if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) {
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
tdRefRSmaInfo(pSma, pRSmaInfo);
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
ASSERT(pRSmaInfo->suid == suid);
|
||||
return pRSmaInfo;
|
||||
}
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
@ -762,41 +829,9 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
|
||||
SRSmaInfo *pCowRSmaInfo = NULL;
|
||||
// lock
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
if (!(pCowRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)))) { // 2-phase lock
|
||||
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
|
||||
if (iRSmaInfo) {
|
||||
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
|
||||
if (pIRSmaInfo && !RSMA_INFO_IS_DEL(pIRSmaInfo)) {
|
||||
if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) {
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
|
||||
return NULL;
|
||||
}
|
||||
smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) {
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo;
|
||||
ASSERT(!pCowRSmaInfo);
|
||||
}
|
||||
|
||||
if (pCowRSmaInfo) {
|
||||
tdRefRSmaInfo(pSma, pCowRSmaInfo);
|
||||
}
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return pCowRSmaInfo;
|
||||
return pRSmaInfo;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
||||
|
@ -805,10 +840,47 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief async mode
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @param inputType
|
||||
* @param suid
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
|
||||
if (!pRSmaInfo) {
|
||||
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
if (tdExecuteRSmaImplAsync(pSma, pMsg, inputType, pRSmaInfo, suid) < 0) {
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/**
|
||||
* @brief sync mode
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @param inputType
|
||||
* @param suid
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
|
||||
if (!pRSmaInfo) {
|
||||
smaError("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -820,6 +892,47 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
|
|||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t tdRSmaExecCheck(SSma *pSma) {
|
||||
SRSmaStat *pRsmaStat = SMA_RSMA_STAT(pSma);
|
||||
int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize);
|
||||
|
||||
if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) {
|
||||
smaError("vgId:%d, return directly as execStat:%" PRIi8 ", bufSize:%" PRIi64, SMA_VID(pSma), pRsmaStat->execStat,
|
||||
bufSize);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
smaError("vgId:%d, go on exec as execStat:%" PRIi8 ", bufSize:%" PRIi64, SMA_VID(pSma), pRsmaStat->execStat, bufSize);
|
||||
|
||||
pRsmaStat->execStat = 1;
|
||||
|
||||
SRSmaExecMsg fetchMsg;
|
||||
int32_t contLen = sizeof(SMsgHead);
|
||||
void *pBuf = rpcMallocCont(0 + contLen);
|
||||
|
||||
((SMsgHead *)pBuf)->vgId = SMA_VID(pSma);
|
||||
((SMsgHead *)pBuf)->contLen = sizeof(SMsgHead);
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.code = 0,
|
||||
.msgType = TDMT_VND_EXEC_RSMA,
|
||||
.pCont = pBuf,
|
||||
.contLen = contLen,
|
||||
};
|
||||
|
||||
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) {
|
||||
smaError("vgId:%d, failed to put rsma exec msg into query-queue since %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, success to put rsma fetch msg into query-queue", SMA_VID(pSma));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
pRsmaStat->execStat = 0;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
|
@ -839,16 +952,18 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
|||
tdFetchSubmitReqSuids(pMsg, &uidStore);
|
||||
|
||||
if (uidStore.suid != 0) {
|
||||
tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid);
|
||||
tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid);
|
||||
|
||||
void *pIter = taosHashIterate(uidStore.uidHash, NULL);
|
||||
while (pIter) {
|
||||
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid);
|
||||
tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid);
|
||||
pIter = taosHashIterate(uidStore.uidHash, pIter);
|
||||
}
|
||||
|
||||
tdUidStoreDestory(&uidStore);
|
||||
|
||||
tdRSmaExecCheck(pSma);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1282,7 +1397,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i);
|
||||
if (!taskInfo) {
|
||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
|
||||
continue;
|
||||
|
@ -1452,7 +1567,7 @@ _end:
|
|||
* @param level
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
|
||||
static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
|
||||
SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level};
|
||||
int32_t ret = 0;
|
||||
int32_t contLen = 0;
|
||||
|
@ -1479,7 +1594,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
|
|||
.code = 0,
|
||||
.msgType = TDMT_VND_FETCH_RSMA,
|
||||
.pCont = pBuf,
|
||||
.contLen = contLen,
|
||||
.contLen = contLen + sizeof(SMsgHead),
|
||||
};
|
||||
|
||||
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) {
|
||||
|
@ -1541,7 +1656,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
|
|||
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, STREAM_INPUT__DATA_BLOCK) < 0) {
|
||||
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -1558,3 +1673,125 @@ _err:
|
|||
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static void tdFreeRSmaSubmitItems(SArray *pItems) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
|
||||
taosFreeQitem(*(void **)taosArrayGet(pItems, i));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tdRSmaProcessExecImpl(SSma *pSma) {
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
SHashObj *infoHash = NULL;
|
||||
SArray *pSubmitQArr = NULL;
|
||||
SArray *pSubmitArr = NULL;
|
||||
|
||||
if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
|
||||
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
taosRLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZ) {
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (!(pSubmitArr = taosArrayInit(1024, POINTER_BYTES))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SRSmaExecQItem qItem = {0};
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
void *pIter = taosHashIterate(infoHash, NULL);
|
||||
while (pIter) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
|
||||
if (taosQueueItemSize(pInfo->queue)) {
|
||||
taosReadAllQitems(pInfo->queue, pInfo->qall);
|
||||
qItem.qall = &pInfo->qall;
|
||||
qItem.pRSmaInfo = pIter;
|
||||
taosArrayPush(pSubmitQArr, &qItem);
|
||||
}
|
||||
ASSERT(taosQueueItemSize(pInfo->queue) == 0);
|
||||
pIter = taosHashIterate(infoHash, pIter);
|
||||
}
|
||||
|
||||
atomic_store_64(&pRSmaStat->qBufSize, 0);
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
smaError("vgId:%d after exec qBufSize is:%" PRIi64, SMA_VID(pSma), atomic_load_64(&pRSmaStat->qBufSize));
|
||||
|
||||
int32_t qSize = taosArrayGetSize(pSubmitQArr);
|
||||
for (int32_t i = 0; i < qSize; ++i) {
|
||||
SRSmaExecQItem *pItem = taosArrayGet(pSubmitQArr, i);
|
||||
while (1) {
|
||||
void *msg = NULL;
|
||||
taosGetQitem(*(STaosQall **)pItem->qall, (void **)&msg);
|
||||
if (msg) {
|
||||
if (taosArrayPush(pSubmitArr, &msg) < 0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t size = taosArrayGetSize(pSubmitArr);
|
||||
if (size > 0) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo;
|
||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (tdExecuteRSmaImpl(pSma, *(SSubmitReq**)pSubmitArr->pData, size, STREAM_INPUT__DATA_SUBMIT, pInfo, pInfo->suid, i) < 0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
taosArrayClear(pSubmitArr);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pSubmitArr);
|
||||
taosArrayDestroy(pSubmitQArr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
taosArrayDestroy(pSubmitArr);
|
||||
taosArrayDestroy(pSubmitQArr);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief exec rsma level 1data, fetch result of level 2/3 and submit
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t smaProcessExec(SSma *pSma, void *pMsg) {
|
||||
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
|
||||
SRSmaStat *pRsmaStat = SMA_RSMA_STAT(pSma);
|
||||
|
||||
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
|
||||
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tdRSmaProcessExecImpl(pSma) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pRsmaStat->execStat = 0;
|
||||
smaWarn("vgId:%d, success to process rsma exec msg", SMA_VID(pSma));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
pRsmaStat->execStat = 0;
|
||||
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
|
|
@ -175,7 +175,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
}
|
||||
|
||||
tdRefSmaStat(pSma, pStat);
|
||||
pTsmaStat = SMA_TSMA_STAT(pStat);
|
||||
pTsmaStat = SMA_STAT_TSMA(pStat);
|
||||
|
||||
if (!pTsmaStat->pTSma) {
|
||||
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
|
||||
|
|
|
@ -350,49 +350,45 @@ _err:
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief pTSchema is shared
|
||||
* @brief Clone qTaskInfo of SRSmaInfo
|
||||
*
|
||||
* @param pSma
|
||||
* @param pDest
|
||||
* @param pSrc
|
||||
* @param pInfo
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
||||
SRSmaParam *param = NULL;
|
||||
if (!pSrc) {
|
||||
*pDest = NULL;
|
||||
if (!pInfo) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, SMA_META(pSma), 0);
|
||||
smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid);
|
||||
if (metaGetTableEntryByUid(&mr, pSrc->suid) < 0) {
|
||||
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid,
|
||||
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
|
||||
if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
|
||||
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
|
||||
terrstr());
|
||||
goto _err;
|
||||
}
|
||||
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
|
||||
ASSERT(mr.me.uid == pSrc->suid);
|
||||
ASSERT(mr.me.uid == pInfo->suid);
|
||||
if (TABLE_IS_ROLLUP(mr.me.flags)) {
|
||||
param = &mr.me.stbEntry.rsmaParam;
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) {
|
||||
if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid);
|
||||
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
|
||||
} else {
|
||||
terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
|
||||
*pDest = pSrc; // pointer copy
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
*pDest = NULL;
|
||||
metaReaderClear(&mr);
|
||||
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr());
|
||||
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
|
@ -295,6 +295,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_VND_FETCH_RSMA:
|
||||
return smaProcessFetch(pVnode->pSma, pMsg);
|
||||
case TDMT_VND_EXEC_RSMA:
|
||||
return smaProcessExec(pVnode->pSma, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in query queue", pMsg->msgType);
|
||||
return TSDB_CODE_VND_APP_ERROR;
|
||||
|
|
|
@ -55,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
taosArrayClear(pInfo->pBlockLists);
|
||||
|
||||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
ASSERT(numOfBlocks > 1);
|
||||
// ASSERT(numOfBlocks > 1);
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
|
||||
taosArrayPush(pInfo->pBlockLists, &pReq);
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "tsimplehash.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
|
||||
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
|
||||
|
@ -106,27 +107,27 @@ static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *data
|
|||
return pNewNode;
|
||||
}
|
||||
|
||||
static void taosHashTableResize(SSHashObj *pHashObj) {
|
||||
static void tSimpleHashTableResize(SSHashObj *pHashObj) {
|
||||
if (!SHASH_NEED_RESIZE(pHashObj)) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
|
||||
if (newCapacity > HASH_MAX_CAPACITY) {
|
||||
// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached",
|
||||
// pHashObj->capacity, HASH_MAX_CAPACITY);
|
||||
uDebug("current capacity:%zu, maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached",
|
||||
pHashObj->capacity, HASH_MAX_CAPACITY);
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
|
||||
if (!pNewEntryList) {
|
||||
// qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
|
||||
uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
|
||||
return;
|
||||
}
|
||||
|
||||
size_t inc = newCapacity - pHashObj->capacity;
|
||||
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc);
|
||||
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc * sizeof(void *));
|
||||
|
||||
pHashObj->hashList = pNewEntryList;
|
||||
pHashObj->capacity = newCapacity;
|
||||
|
@ -179,7 +180,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
|
|||
|
||||
// need the resize process, write lock applied
|
||||
if (SHASH_NEED_RESIZE(pHashObj)) {
|
||||
taosHashTableResize(pHashObj);
|
||||
tSimpleHashTableResize(pHashObj);
|
||||
}
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
|
|
|
@ -616,6 +616,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema")
|
||||
|
||||
//index
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
||||
|
|
Loading…
Reference in New Issue