enh: rsma batch process

This commit is contained in:
Cary Xu 2022-08-19 17:19:50 +08:00
parent f0cdafd67a
commit 441ce21677
12 changed files with 253 additions and 332 deletions

View File

@ -2667,31 +2667,6 @@ typedef struct {
int32_t padding;
} SRSmaExecMsg;
typedef struct {
int64_t suid;
int8_t level;
} SRSmaFetchMsg;
static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->level) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->level) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX

View File

@ -201,7 +201,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL)

View File

@ -76,6 +76,7 @@ void taosFreeQall(STaosQall *qall);
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall);
int32_t taosGetQitem(STaosQall *qall, void **ppItem);
void taosResetQitems(STaosQall *qall);
int32_t taosQallItemSize(STaosQall *qall);
STaosQset *taosOpenQset();
void taosCloseQset(STaosQset *qset);

View File

@ -442,6 +442,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
if (req.rollup) {
req.rsmaParam.maxdelay[0] = pStb->maxdelay[0];
req.rsmaParam.maxdelay[1] = pStb->maxdelay[1];
req.rsmaParam.watermark[0] = pStb->watermark[0];
req.rsmaParam.watermark[1] = pStb->watermark[1];
if (pStb->ast1Len > 0) {
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) {

View File

@ -90,14 +90,14 @@ struct SRSmaStat {
SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit
int64_t refId; // shared by fetch tasks
volatile int64_t qBufSize; // queue buffer size
volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int8_t nExecutor; // [1, max(half of query threads, 4)]
int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
int8_t execStat; // 0 not in exec , 1 in exec
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
SHashObj *infoHash; // key: suid, value: SRSmaInfo
SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2
tsem_t notEmpty; // has items in queue buffer
};
struct SSmaStat {
@ -111,26 +111,29 @@ struct SSmaStat {
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
#define SMA_STAT_RSMA(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->infoHash)
#define RSMA_FETCH_HASH(r) ((r)->fetchHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem {
int8_t level;
int8_t triggerStat;
uint16_t interval; // second
int32_t maxDelay;
tmr_h tmrId;
int8_t level;
int8_t triggerStat;
uint8_t nSkipped; // number of skipped to fetch data from all active window
int8_t fetchLevel;
int32_t maxDelay; // ms
int64_t lastFetch; // ms
tmr_h tmrId;
};
struct SRSmaInfo {
STSchema *pTSchema;
int64_t suid;
int64_t refId; // refId of SRSmaStat
uint64_t delFlag : 1;
uint64_t lastReceived : 63; // second
int64_t refId; // refId of SRSmaStat
int64_t lastRecv; // ms
int8_t delFlag;
int8_t assigned; // 0 idle, 1 assgined for exec
int16_t padding;
T_REF_DECLARE()
SRSmaInfoItem items[TSDB_RETENTION_L2];
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t

View File

@ -198,7 +198,6 @@ int32_t smaAsyncPreCommit(SSma* pSma);
int32_t smaAsyncCommit(SSma* pSma);
int32_t smaAsyncPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now);
int32_t smaProcessFetch(SSma* pSma, void* pMsg);
int32_t smaProcessExec(SSma* pSma, void* pMsg);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);

View File

@ -321,10 +321,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
smaDebug("vgId:%d, rsma commit, fetch tasks all finished", SMA_VID(pSma));
break;
} else {
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
smaDebug("vgId:%d, rsma commit, fetch tasks not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
@ -338,30 +338,25 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
* 1) This is high cost task and should not put in asyncPreCommit originally.
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
*/
nLoops = 0;
smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1);
if (old == 0) break;
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
}
}
smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) {
atomic_store_8(&pRSmaStat->execStat, 0);
return TSDB_CODE_FAILED;
}
smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId());
nLoops = 0;
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
// taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat));
@ -376,11 +371,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
}
atomic_store_64(&pRSmaStat->qBufSize, 0);
atomic_store_8(&pRSmaStat->execStat, 0);
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif
// step 5: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
@ -426,7 +419,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
// step 1: merge qTaskInfo and iQTaskInfo
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
// taosWLockLatch(SMA_ENV_LOCK(pEnv));
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
while (pIter) {
@ -480,10 +473,9 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
}
taosArrayDestroy(rsmaDeleted);
// TODO: remove suid in files?
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 2: cleanup outdated qtaskinfo files
tdCleanupQTaskInfoFiles(pSma, pRSmaStat);

View File

@ -209,6 +209,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
pRSmaStat->pSma = (SSma *)pSma;
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
tsem_init(&pRSmaStat->notEmpty, 0, 0);
// init smaMgmt
smaInit();
@ -230,12 +231,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
if (!RSMA_INFO_HASH(pRSmaStat)) {
return TSDB_CODE_FAILED;
}
RSMA_FETCH_HASH(pRSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_FETCH_HASH(pRSmaStat)) {
return TSDB_CODE_FAILED;
}
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO
} else {
@ -267,6 +262,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
// step 1: set rsma trigger stat cancelled
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
tsem_destroy(&(pStat->notEmpty));
// step 2: destroy the rsma info and associated fetch tasks
if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
@ -279,10 +275,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
}
taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 3: destroy the rsma fetch hash
taosHashCleanup(RSMA_FETCH_HASH(pStat));
// step 4: wait all triggered fetch tasks finished
// step 3: wait all triggered fetch tasks finished
int32_t nLoops = 0;
while (1) {
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {

View File

@ -15,10 +15,14 @@
#include "sma.h"
#define RSMA_QTASKINFO_BUFSIZE (32768)
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKEXEC_BUFSIZE (1048576)
#define RSMA_SUBMIT_BATCH_SIZE (1024)
#define RSMA_QTASKINFO_BUFSIZE (32768) // size
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
#define RSMA_EXECUTOR_MAX (4) // cnt
#define RSMA_FETCH_DELAY_MAX (1800000) // ms
#define RSMA_FETCH_SKIP_MAX (1000) // cnt
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms
SSmaMgmt smaMgmt = {
.inited = 0,
@ -40,11 +44,10 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems);
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr);
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr);
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
@ -668,7 +671,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
} else {
smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
}
#if 0
#if 1
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowDataBlocks(pResList, flag);
@ -729,22 +732,24 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
taosWriteQitem(pInfo->queue, qItem);
pInfo->lastRecv = taosGetTimestampMs();
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen);
tsem_post(&(pRSmaStat->notEmpty));
int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
// smoothing consume
int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZE;
int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE;
if (n > 1) {
if (n > 10) {
n = 10;
}
taosMsleep(n << 4);
if (n > 2) {
taosMsleep(n << 3);
if (n > 5) {
smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
} else {
smaDebug("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3);
}
}
@ -840,25 +845,25 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return NULL;
}
taosRLockLatch(SMA_ENV_LOCK(pEnv));
// taosRLockLatch(SMA_ENV_LOCK(pEnv));
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
if (!pRSmaInfo->taskInfo[0]) {
if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
}
tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(pRSmaInfo->suid == suid);
return pRSmaInfo;
}
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
// taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
@ -910,22 +915,11 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
static int32_t tdRSmaExecCheck(SSma *pSma) {
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
int64_t bufSize = atomic_load_64(&pRSmaStat->qBufSize);
if (bufSize < RSMA_QTASKEXEC_BUFSIZE) {
smaDebug("vgId:%d, bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize,
RSMA_QTASKEXEC_BUFSIZE);
if (atomic_load_8(&pRSmaStat->nExecutor) >= TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) {
return TSDB_CODE_SUCCESS;
}
if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 1) {
smaDebug("vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma),
bufSize);
return TSDB_CODE_SUCCESS;
}
smaDebug("vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize);
SRSmaExecMsg fetchMsg;
int32_t contLen = sizeof(SMsgHead);
void *pBuf = rpcMallocCont(0 + contLen);
@ -949,7 +943,6 @@ static int32_t tdRSmaExecCheck(SSma *pSma) {
return TSDB_CODE_SUCCESS;
_err:
atomic_store_8(&pRSmaStat->execStat, 0);
return TSDB_CODE_FAILED;
}
@ -959,7 +952,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
// only applicable when rsma env exists
return TSDB_CODE_SUCCESS;
}
STbUidStore uidStore = {0};
SRetention *pRetention = SMA_RETENTION(pSma);
if (!RETENTION_VALID(pRetention + 1)) {
// return directly if retention level 1 is invalid
@ -967,25 +960,35 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
}
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
STbUidStore uidStore = {0};
tdFetchSubmitReqSuids(pMsg, &uidStore);
if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) {
goto _err;
}
if (uidStore.suid != 0) {
tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid);
if (tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid) < 0) {
goto _err;
}
void *pIter = taosHashIterate(uidStore.uidHash, NULL);
while (pIter) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid);
if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) {
goto _err;
}
pIter = taosHashIterate(uidStore.uidHash, pIter);
}
tdUidStoreDestory(&uidStore);
tdRSmaExecCheck(pSma);
if (tdRSmaExecCheck(pSma) < 0) {
goto _err;
}
}
}
tdUidStoreDestory(&uidStore);
return TSDB_CODE_SUCCESS;
_err:
tdUidStoreDestory(&uidStore);
smaError("vgId:%d, failed to process rsma submit since: %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
}
/**
@ -1521,8 +1524,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
if (!pStat) {
smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)",
smaMgmt.rsetId, pRSmaInfo->refId);
smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pRSmaInfo->refId);
return;
}
@ -1553,7 +1556,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
// async process
tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level);
pItem->fetchLevel = pItem->level;
SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid);
SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1);
ASSERT(qItem->level == pItem->level);
ASSERT(qItem->fetchLevel == pItem->fetchLevel);
tsem_post(&(pStat->notEmpty));
smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level,
pRSmaInfo->suid);
} break;
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused",
@ -1568,8 +1578,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
default: {
smaWarn("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
} break;
}
@ -1578,135 +1588,21 @@ _end:
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
}
/**
* @brief put rsma fetch msg to fetch queue
*
* @param pSma
* @param pInfo
* @param level
* @return int32_t
*/
static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level};
int32_t ret = 0;
int32_t contLen = 0;
SEncoder encoder = {0};
tEncodeSize(tEncodeSRSmaFetchMsg, &fetchMsg, contLen, ret);
if (ret < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tEncoderClear(&encoder);
goto _err;
}
void *pBuf = rpcMallocCont(contLen + sizeof(SMsgHead));
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), contLen);
if (tEncodeSRSmaFetchMsg(&encoder, &fetchMsg) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tEncoderClear(&encoder);
}
tEncoderClear(&encoder);
((SMsgHead *)pBuf)->vgId = SMA_VID(pSma);
((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead);
SRpcMsg rpcMsg = {
.code = 0,
.msgType = TDMT_VND_FETCH_RSMA,
.pCont = pBuf,
.contLen = contLen + sizeof(SMsgHead),
};
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) {
smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s",
SMA_VID(pSma), pInfo->suid, level, terrstr());
goto _err;
}
smaDebug("vgId:%d, success to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma),
pInfo->suid, level);
return TSDB_CODE_SUCCESS;
_err:
return TSDB_CODE_FAILED;
}
/**
* @brief fetch rsma data of level 2/3 and submit
*
* @param pSma
* @param pMsg
* @return int32_t
*/
int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
SRSmaFetchMsg req = {0};
SDecoder decoder = {0};
void *pBuf = NULL;
SRSmaStat *pRSmaStat = NULL;
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
goto _err;
}
pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead));
tDecoderInit(&decoder, pBuf, pRpcMsg->contLen);
if (tDecodeSRSmaFetchMsg(&decoder, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _err;
}
pRSmaStat = SMA_RSMA_STAT(pSma);
if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 0) {
SArray *pSubmitArr = NULL;
if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pRSmaStat->execStat, 0);
goto _err;
}
tdRSmaConsumeAndFetch(pSma, req.suid, req.level, pSubmitArr);
atomic_store_8(&pRSmaStat->execStat, 0);
taosArrayDestroy(pSubmitArr);
} else {
int8_t level = req.level;
int8_t *val = taosHashGet(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid));
if (val) {
level |= (*val);
}
ASSERT(level >= 1 && level <= 3);
taosHashPut(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid), &level, sizeof(level));
}
tDecoderClear(&decoder);
smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid,
req.level);
return TSDB_CODE_SUCCESS;
_err:
tDecoderClear(&decoder);
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
}
static void tdFreeRSmaSubmitItems(SArray *pItems) {
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
taosFreeQitem(*(void **)taosArrayGet(pItems, i));
}
}
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr) {
SRSmaInfo *pInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
if (!pInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) {
// step 1: consume submit req
#if 0
int64_t qMemSize = 0;
if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) {
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
atomic_fetch_sub_64(&pRSmaStat->qBufSize, qMemSize);
atomic_fetch_sub_64(&pRSmaStat->nBufItems, taosQallItemSize(pInfo->qall));
taosArrayClear(pSubmitArr);
@ -1736,25 +1632,52 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SAr
tdFreeRSmaSubmitItems(pSubmitArr);
}
}
// step 2: fetch rsma result
#endif
// step 2: fetch rsma result(consider the efficiency and functionality)
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (level & i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
if (pItem->fetchLevel) {
pItem->fetchLevel = 0;
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
if (!taskInfo) {
continue;
}
int64_t curMs = taosGetTimestampMs();
if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
pItem->nSkipped = 0;
smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed",
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
} else {
if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) {
++pItem->nSkipped;
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
continue;
} else {
smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
}
}
pItem->lastFetch = curMs;
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err;
}
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, suid) < 0) {
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
tdCleanupStreamInputDataBlock(taskInfo);
goto _err;
}
tdCleanupStreamInputDataBlock(taskInfo);
smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished",
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
} else {
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8
" maxDelay:%d, fetch not executed as fetchLevel is %" PRIi8,
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel);
}
}
@ -1766,6 +1689,45 @@ _err:
return TSDB_CODE_FAILED;
}
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
taosArrayClear(pSubmitArr);
while (1) {
void *msg = NULL;
taosGetQitem(qall, (void **)&msg);
if (msg) {
if (taosArrayPush(pSubmitArr, &msg) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
} else {
break;
}
}
int32_t size = taosArrayGetSize(pSubmitArr);
if (size > 0) {
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
}
tdFreeRSmaSubmitItems(pSubmitArr);
}
return TSDB_CODE_SUCCESS;
_err:
while (1) {
void *msg = NULL;
taosGetQitem(qall, (void **)&msg);
if (msg) {
taosFreeQitem(msg);
} else {
break;
}
}
return TSDB_CODE_FAILED;
}
/**
* @brief
*
@ -1777,7 +1739,6 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SHashObj *infoHash = NULL;
SArray *pSubmitQArr = NULL;
SArray *pSubmitArr = NULL;
bool isFetchAll = false;
@ -1786,106 +1747,95 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
goto _err;
}
if (type == RSMA_EXEC_OVERFLOW) {
taosRLockLatch(SMA_ENV_LOCK(pEnv));
if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZE) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return TSDB_CODE_SUCCESS;
}
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
}
if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) {
if (!(pSubmitArr =
taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), POINTER_BYTES))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// step 1: rsma exec - consume data in buffer queue for all suids
SRSmaExecQItem qItem = {0};
void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock
if (type == RSMA_EXEC_OVERFLOW) {
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
if (taosQueueItemSize(pInfo->queue)) {
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
qItem.qall = &pInfo->qall;
qItem.pRSmaInfo = pIter;
taosArrayPush(pSubmitQArr, &qItem);
}
ASSERT(taosQueueItemSize(pInfo->queue) == 0);
pIter = taosHashIterate(infoHash, pIter);
}
} else if (type == RSMA_EXEC_COMMIT) {
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
if (taosQueueItemSize(pInfo->iQueue)) {
taosReadAllQitems(pInfo->iQueue, pInfo->iQall);
qItem.qall = &pInfo->iQall;
qItem.pRSmaInfo = pIter;
taosArrayPush(pSubmitQArr, &qItem);
}
ASSERT(taosQueueItemSize(pInfo->iQueue) == 0);
pIter = taosHashIterate(infoHash, pIter);
}
} else {
ASSERT(0);
}
atomic_store_64(&pRSmaStat->qBufSize, 0);
int32_t qSize = taosArrayGetSize(pSubmitQArr);
for (int32_t i = 0; i < qSize; ++i) {
SRSmaExecQItem *pItem = taosArrayGet(pSubmitQArr, i);
while (1) {
void *msg = NULL;
taosGetQitem(*(STaosQall **)pItem->qall, (void **)&msg);
if (msg) {
if (taosArrayPush(pSubmitArr, &msg) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
int32_t nIdle = 0;
while (true) {
if (++nIdle > 100) {
if (atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1) > 1) {
// free the exec thread if without SubmitReq
break;
} else {
// keep at least 1 exec thread only if without SubmitReq in case of no query thread to use when busy again
atomic_add_fetch_8(&pRSmaStat->nExecutor, 1);
nIdle = 0;
}
}
// step 1: rsma exec - consume data in buffer queue for all suids
if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) {
void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
int64_t itemSize = 0;
if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel ||
RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
smaDebug("vgId:%d queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type);
if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
int32_t qallItemSize = taosQallItemSize(pInfo->qall);
if (qallItemSize > 0) {
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
}
tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr);
if (qallItemSize > 0) {
// subtract the item size after the task finished, commit should wait for all items be consumed
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
nIdle = 0;
}
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
}
}
pIter = taosHashIterate(infoHash, pIter);
}
if (type == RSMA_EXEC_COMMIT) {
break;
}
}
#if 0
else if (type == RSMA_EXEC_COMMIT) {
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
if (taosQueueItemSize(pInfo->iQueue)) {
if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
taosReadAllQitems(pInfo->iQueue, pInfo->iQall); // queue has mutex lock
int32_t qallItemSize = taosQallItemSize(pInfo->iQall);
if (qallItemSize > 0) {
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
nIdle = 0;
int32_t size = taosArrayGetSize(pSubmitArr);
if (size > 0) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo;
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
// batch exec
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
}
// tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr);
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
}
}
ASSERT(taosQueueItemSize(pInfo->iQueue) == 0);
pIter = taosHashIterate(infoHash, pIter);
}
tdFreeRSmaSubmitItems(pSubmitArr);
taosArrayClear(pSubmitArr);
break;
}
#endif
else {
ASSERT(0);
}
}
// step 2: rsma fetch - consume data in buffer queue for suids triggered by timer
if (taosHashGetSize(RSMA_FETCH_HASH(pRSmaStat)) <= 0) {
goto _end;
}
pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), NULL);
if (pIter) {
tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr);
while ((pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), pIter))) {
tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr);
if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
tsem_wait(&pRSmaStat->notEmpty);
}
}
} // end of while(true)
_end:
taosArrayDestroy(pSubmitArr);
taosArrayDestroy(pSubmitQArr);
return TSDB_CODE_SUCCESS;
_err:
taosArrayDestroy(pSubmitArr);
taosArrayDestroy(pSubmitQArr);
return TSDB_CODE_FAILED;
}
@ -1905,15 +1855,21 @@ int32_t smaProcessExec(SSma *pSma, void *pMsg) {
goto _err;
}
smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) {
goto _err;
int8_t nOld = atomic_fetch_add_8(&pRSmaStat->nExecutor, 1);
if (nOld < TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) {
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) {
goto _err;
}
} else {
atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1);
}
atomic_store_8(&pRSmaStat->execStat, 0);
smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
return TSDB_CODE_SUCCESS;
_err:
atomic_store_8(&pRSmaStat->execStat, 0);
atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1);
smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(),
terrstr());
return TSDB_CODE_FAILED;

View File

@ -301,8 +301,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSMA:
return smaProcessFetch(pVnode->pSma, pMsg);
case TDMT_VND_EXEC_RSMA:
return smaProcessExec(pVnode->pSma, pMsg);
default:

View File

@ -3131,6 +3131,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
initResultRow(resultRow);
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
// releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId));
}
if (offset != length) {

View File

@ -298,7 +298,8 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
return num;
}
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
STaosQset *taosOpenQset() {
STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);