enh: rsma batch process
This commit is contained in:
parent
0ade372145
commit
32be8a71ee
|
@ -96,10 +96,10 @@ struct SRSmaStat {
|
|||
int8_t commitStat; // 0 not in committing, 1 in committing
|
||||
int8_t execStat; // 0 not in exec , 1 in exec
|
||||
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
|
||||
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
|
||||
SHashObj *infoHash; // key: suid, value: SRSmaInfo
|
||||
SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2
|
||||
};
|
||||
|
||||
|
||||
struct SSmaStat {
|
||||
union {
|
||||
STSmaStat tsmaStat; // time-range-wise sma
|
||||
|
@ -108,13 +108,14 @@ struct SSmaStat {
|
|||
T_REF_DECLARE()
|
||||
};
|
||||
|
||||
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
|
||||
#define SMA_STAT_RSMA(s) (&(s)->rsmaStat)
|
||||
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
|
||||
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
|
||||
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
|
||||
#define RSMA_REF_ID(r) ((r)->refId)
|
||||
#define RSMA_FS_LOCK(r) (&(r)->lock)
|
||||
#define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
|
||||
#define SMA_STAT_RSMA(s) (&(s)->rsmaStat)
|
||||
#define RSMA_INFO_HASH(r) ((r)->infoHash)
|
||||
#define RSMA_FETCH_HASH(r) ((r)->fetchHash)
|
||||
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
|
||||
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
|
||||
#define RSMA_REF_ID(r) ((r)->refId)
|
||||
#define RSMA_FS_LOCK(r) (&(r)->lock)
|
||||
|
||||
struct SRSmaInfoItem {
|
||||
int8_t level;
|
||||
|
@ -142,7 +143,7 @@ struct SRSmaInfo {
|
|||
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
|
||||
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
|
||||
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
|
||||
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
|
||||
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
|
||||
#define RSMA_INFO_ITEM(r, i) (&(r)->items[i])
|
||||
|
||||
enum {
|
||||
|
@ -167,6 +168,12 @@ enum {
|
|||
RSMA_RESTORE_SYNC = 2,
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
RSMA_EXEC_OVERFLOW = 1, // triggered by queue buf overflow
|
||||
RSMA_EXEC_TIMEOUT = 2, // triggered by timer
|
||||
RSMA_EXEC_COMMIT = 3, // triggered by commit
|
||||
} ERsmaExecType;
|
||||
|
||||
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
||||
|
||||
|
@ -240,7 +247,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
|
|||
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
|
||||
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
|
||||
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
|
||||
int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type);
|
||||
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
|
||||
|
||||
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
||||
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
|
||||
|
|
|
@ -121,7 +121,7 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
|
||||
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
|
||||
|
||||
// step 1: set rsma stat paused
|
||||
|
@ -333,7 +333,34 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
}
|
||||
}
|
||||
|
||||
// step 3: swap queue/qall and iQueue/iQal
|
||||
/**
|
||||
* @brief step 3: consume the SubmitReq in buffer
|
||||
* 1) This is high cost task and should not put in asyncPreCommit originally.
|
||||
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
|
||||
*/
|
||||
nLoops = 0;
|
||||
smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
|
||||
int8_t old;
|
||||
while (1) {
|
||||
old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1);
|
||||
if (old == 0) break;
|
||||
if (++nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
}
|
||||
}
|
||||
|
||||
smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
|
||||
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) {
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
||||
// step 4: swap queue/qall and iQueue/iQall
|
||||
// lock
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
|
@ -351,11 +378,12 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
}
|
||||
|
||||
atomic_store_64(&pRSmaStat->qBufSize, 0);
|
||||
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
// step 4: others
|
||||
|
||||
// step 5: others
|
||||
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -375,36 +403,17 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
|
|||
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||
|
||||
// step 1: consume the SubmitReq in buffer
|
||||
int32_t nLoops = 0;
|
||||
smaDebug("vgId:%d start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
while (pRSmaStat->execStat == 1) {
|
||||
taosMsleep(15);
|
||||
if ((++nLoops & 63) == 0) {
|
||||
smaWarn("vgId:%d 1s waited for rsma exec stat = 0, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
sched_yield();
|
||||
}
|
||||
}
|
||||
pRSmaStat->execStat = 1;
|
||||
smaDebug("vgId:%d end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
|
||||
if (tdRSmaProcessExecImpl(pSma, 1) < 0) {
|
||||
pRSmaStat->execStat = 0;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// step 2: perform persist task for qTaskInfo operator
|
||||
// perform persist task for qTaskInfo operator
|
||||
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
|
||||
pRSmaStat->execStat = 0;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pRSmaStat->execStat = 0;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty.
|
||||
* @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty.
|
||||
*
|
||||
* @param pSma
|
||||
* @return int32_t
|
||||
|
@ -424,13 +433,13 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
|||
|
||||
void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
|
||||
while (pIter) {
|
||||
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
|
||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
|
||||
if (refVal == 0) {
|
||||
if(!rsmaDeleted) {
|
||||
if((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))){
|
||||
if (!rsmaDeleted) {
|
||||
if ((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))) {
|
||||
taosArrayPush(rsmaDeleted, pSuid);
|
||||
}
|
||||
}
|
||||
|
@ -461,22 +470,20 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
|||
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(rsmaDeleted) > 0) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
|
||||
tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i);
|
||||
void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
|
||||
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
|
||||
"table:%" PRIi64,
|
||||
SMA_VID(pSma), *pSuid);
|
||||
}
|
||||
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
|
||||
for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
|
||||
tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i);
|
||||
void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
|
||||
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
|
||||
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
|
||||
smaDebug(
|
||||
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
|
||||
"table:%" PRIi64,
|
||||
SMA_VID(pSma), *pSuid);
|
||||
}
|
||||
// remove suid in files
|
||||
taosArrayDestroy(rsmaDeleted);
|
||||
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
|
||||
}
|
||||
taosArrayDestroy(rsmaDeleted);
|
||||
// TODO: remove suid in files?
|
||||
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
|
|
@ -171,7 +171,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
|
|||
|
||||
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
|
||||
if (!pRSmaInfo) return 0;
|
||||
|
||||
|
||||
int ref = T_REF_INC(pRSmaInfo);
|
||||
smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
|
||||
return 0;
|
||||
|
@ -228,7 +228,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
|||
RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
|
||||
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
if (!RSMA_INFO_HASH(pRSmaStat)) {
|
||||
taosMemoryFreeClear(*pSmaStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
RSMA_FETCH_HASH(pRSmaStat) = taosHashInit(
|
||||
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
if (!RSMA_FETCH_HASH(pRSmaStat)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
|
@ -274,7 +279,10 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
}
|
||||
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
||||
|
||||
// step 3: wait all triggered fetch tasks finished
|
||||
// step 3: destroy the rsma fetch hash
|
||||
taosHashCleanup(RSMA_FETCH_HASH(pStat));
|
||||
|
||||
// step 4: wait all triggered fetch tasks finished
|
||||
int32_t nLoops = 0;
|
||||
while (1) {
|
||||
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
|
||||
|
@ -289,8 +297,8 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
nLoops = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// step 4: free pStat
|
||||
|
||||
// step 5: free pStat
|
||||
taosMemoryFreeClear(pStat);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
|
||||
#define RSMA_QTASKINFO_BUFSIZE (32768)
|
||||
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
||||
#define RSMA_QTASKEXEC_BUFSIZ (1048576)
|
||||
#define RSMA_QTASKEXEC_BUFSIZE (1048576)
|
||||
#define RSMA_SUBMIT_BATCH_SIZE (1024)
|
||||
|
||||
SSmaMgmt smaMgmt = {
|
||||
.inited = 0,
|
||||
|
@ -35,9 +36,11 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUi
|
|||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||
int8_t idx);
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||
int8_t type, int8_t level);
|
||||
ERsmaExecType type, int8_t level);
|
||||
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
|
||||
static void tdFreeRSmaSubmitItems(SArray *pItems);
|
||||
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr);
|
||||
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
||||
int64_t suid);
|
||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||
|
@ -559,6 +562,14 @@ void *tdUidStoreFree(STbUidStore *pStore) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue
|
||||
* would be freed when close Vnode, thus lock should be used if with race condition.
|
||||
* @param pTsdb
|
||||
* @param version
|
||||
* @param pReq
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
|
||||
if (!pReq) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
|
@ -566,7 +577,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
|
|||
}
|
||||
|
||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||
|
||||
// TODO: spin lock for race conditiond
|
||||
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -696,7 +707,7 @@ _err:
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief Copy msg to rsmaQueueBuffer
|
||||
* @brief Copy msg to rsmaQueueBuffer for batch process
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
|
@ -722,17 +733,17 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
|
|||
int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen);
|
||||
|
||||
// smoothing consume
|
||||
int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZ;
|
||||
int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZE;
|
||||
if (n > 1) {
|
||||
if (n > 10) {
|
||||
n = 10;
|
||||
}
|
||||
taosMsleep(n << 4);
|
||||
if (n > 2) {
|
||||
smaWarn("vgId:%d pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
|
||||
smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
|
||||
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
|
||||
} else {
|
||||
smaDebug("vgId:%d pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
|
||||
smaDebug("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
|
||||
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
|
||||
}
|
||||
}
|
||||
|
@ -751,7 +762,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
|||
if (pBlock == NULL) break;
|
||||
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
smaDebug("vgId:%d numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
|
||||
smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
|
||||
SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
|
||||
}
|
||||
}
|
||||
|
@ -771,10 +782,10 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
|||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||
int8_t type, int8_t level) {
|
||||
ERsmaExecType type, int8_t level) {
|
||||
int32_t idx = level - 1;
|
||||
|
||||
void *qTaskInfo = (type == 0) ? RSMA_INFO_QTASK(pInfo, idx) : RSMA_INFO_IQTASK(pInfo, idx);
|
||||
void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
|
||||
if (!qTaskInfo) {
|
||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
|
||||
pInfo->suid);
|
||||
|
@ -791,7 +802,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
|
|||
#if 0
|
||||
for (int32_t i = 0; i < msgSize; ++i) {
|
||||
SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *));
|
||||
smaDebug("vgId:%d [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version);
|
||||
smaDebug("vgId:%d, [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version);
|
||||
tdRsmaPrintSubmitReq(pSma, pReq);
|
||||
}
|
||||
#endif
|
||||
|
@ -802,11 +813,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
|
|||
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
|
||||
tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid);
|
||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
|
||||
if (smaMgmt.tmrHandle) {
|
||||
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -854,13 +860,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
|
|||
}
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// unlock
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return pRSmaInfo;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
||||
|
@ -890,6 +890,16 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
|
|||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
if (smaMgmt.tmrHandle) {
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
|
||||
if (pItem->level > 0) {
|
||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
}
|
||||
pItem = RSMA_INFO_ITEM(pRSmaInfo, 1);
|
||||
if (pItem->level > 0) {
|
||||
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -898,51 +908,23 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/**
|
||||
* @brief sync mode
|
||||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @param inputType
|
||||
* @param suid
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
|
||||
if (!pRSmaInfo) {
|
||||
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1);
|
||||
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2);
|
||||
}
|
||||
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t tdRSmaExecCheck(SSma *pSma) {
|
||||
SRSmaStat *pRsmaStat = SMA_RSMA_STAT(pSma);
|
||||
int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
int64_t bufSize = atomic_load_64(&pRSmaStat->qBufSize);
|
||||
|
||||
if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) {
|
||||
if (bufSize > RSMA_QTASKEXEC_BUFSIZ) {
|
||||
smaDebug("vgId:%d bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma),
|
||||
bufSize);
|
||||
} else {
|
||||
smaDebug("vgId:%d bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize,
|
||||
RSMA_QTASKEXEC_BUFSIZ);
|
||||
}
|
||||
if (bufSize < RSMA_QTASKEXEC_BUFSIZE) {
|
||||
smaDebug("vgId:%d, bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize,
|
||||
RSMA_QTASKEXEC_BUFSIZE);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize);
|
||||
if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 1) {
|
||||
smaDebug("vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma),
|
||||
bufSize);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pRsmaStat->execStat = 1;
|
||||
smaDebug("vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize);
|
||||
|
||||
SRSmaExecMsg fetchMsg;
|
||||
int32_t contLen = sizeof(SMsgHead);
|
||||
|
@ -967,7 +949,7 @@ static int32_t tdRSmaExecCheck(SSma *pSma) {
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
pRsmaStat->execStat = 0;
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -1592,7 +1574,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
}
|
||||
|
||||
_end:
|
||||
// taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
|
||||
}
|
||||
|
||||
|
@ -1656,13 +1638,11 @@ _err:
|
|||
* @return int32_t
|
||||
*/
|
||||
int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
|
||||
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
|
||||
SRSmaFetchMsg req = {0};
|
||||
SDecoder decoder = {0};
|
||||
void *pBuf = NULL;
|
||||
SRSmaInfo *pInfo = NULL;
|
||||
SRSmaInfoItem *pItem = NULL;
|
||||
|
||||
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
|
||||
SRSmaFetchMsg req = {0};
|
||||
SDecoder decoder = {0};
|
||||
void *pBuf = NULL;
|
||||
SRSmaStat *pRSmaStat = NULL;
|
||||
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
|
||||
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
|
||||
goto _err;
|
||||
|
@ -1676,38 +1656,33 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
#if 0
|
||||
pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid);
|
||||
if (!pInfo) {
|
||||
if (terrno == TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_RSMA_EMPTY_INFO;
|
||||
pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
|
||||
if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 0) {
|
||||
SArray *pSubmitArr = NULL;
|
||||
if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
goto _err;
|
||||
}
|
||||
smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma),
|
||||
req.suid, req.level, terrstr());
|
||||
goto _err;
|
||||
tdRSmaConsumeAndFetch(pSma, req.suid, req.level, pSubmitArr);
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
taosArrayDestroy(pSubmitArr);
|
||||
} else {
|
||||
int8_t level = req.level;
|
||||
int8_t *val = taosHashGet(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid));
|
||||
if (val) {
|
||||
level |= (*val);
|
||||
}
|
||||
ASSERT(level >= 1 && level <= 3);
|
||||
taosHashPut(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid), &level, sizeof(level));
|
||||
}
|
||||
|
||||
pItem = RSMA_INFO_ITEM(pInfo, req.level - 1);
|
||||
|
||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, req.level - 1);
|
||||
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tdCleanupStreamInputDataBlock(taskInfo);
|
||||
|
||||
tdReleaseRSmaInfo(pSma, pInfo);
|
||||
#endif
|
||||
tDecoderClear(&decoder);
|
||||
smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid,
|
||||
req.level);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
// tdReleaseRSmaInfo(pSma, pInfo);
|
||||
tDecoderClear(&decoder);
|
||||
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
|
@ -1719,28 +1694,101 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr) {
|
||||
SRSmaInfo *pInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
|
||||
if (!pInfo) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// step 1: consume submit req
|
||||
int64_t qMemSize = 0;
|
||||
if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) {
|
||||
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
|
||||
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
atomic_fetch_sub_64(&pRSmaStat->qBufSize, qMemSize);
|
||||
|
||||
taosArrayClear(pSubmitArr);
|
||||
|
||||
while (1) {
|
||||
void *msg = NULL;
|
||||
taosGetQitem(pInfo->qall, (void **)&msg);
|
||||
if (msg) {
|
||||
if (taosArrayPush(pSubmitArr, &msg) < 0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t size = taosArrayGetSize(pSubmitArr);
|
||||
if (size > 0) {
|
||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, RSMA_EXEC_TIMEOUT, i) <
|
||||
0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
}
|
||||
}
|
||||
|
||||
// step 2: fetch rsma result
|
||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (level & i) {
|
||||
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
|
||||
if (!taskInfo) {
|
||||
continue;
|
||||
}
|
||||
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
|
||||
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, suid) < 0) {
|
||||
tdCleanupStreamInputDataBlock(taskInfo);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tdCleanupStreamInputDataBlock(taskInfo);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
tdReleaseRSmaInfo(pSma, pInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
tdReleaseRSmaInfo(pSma, pInfo);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param pSma
|
||||
* @param type 0 triggered when buffer overflow, 1 triggered by commit
|
||||
* @param type
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) {
|
||||
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||
SHashObj *infoHash = NULL;
|
||||
SArray *pSubmitQArr = NULL;
|
||||
SArray *pSubmitArr = NULL;
|
||||
bool isFetchAll = false;
|
||||
|
||||
if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
|
||||
terrno = TSDB_CODE_RSMA_INVALID_STAT;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (type == 0) {
|
||||
if (type == RSMA_EXEC_OVERFLOW) {
|
||||
taosRLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZ) {
|
||||
if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZE) {
|
||||
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1752,19 +1800,19 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (!(pSubmitArr = taosArrayInit(1024, POINTER_BYTES))) {
|
||||
if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// step 1: rsma exec - consume data in buffer queue for all suids
|
||||
SRSmaExecQItem qItem = {0};
|
||||
taosWLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
void *pIter = taosHashIterate(infoHash, NULL);
|
||||
if (type == 0) {
|
||||
void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock
|
||||
if (type == RSMA_EXEC_OVERFLOW) {
|
||||
while (pIter) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
|
||||
if (taosQueueItemSize(pInfo->queue)) {
|
||||
taosReadAllQitems(pInfo->queue, pInfo->qall);
|
||||
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
|
||||
qItem.qall = &pInfo->qall;
|
||||
qItem.pRSmaInfo = pIter;
|
||||
taosArrayPush(pSubmitQArr, &qItem);
|
||||
|
@ -1772,7 +1820,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) {
|
|||
ASSERT(taosQueueItemSize(pInfo->queue) == 0);
|
||||
pIter = taosHashIterate(infoHash, pIter);
|
||||
}
|
||||
} else if (type == 1) {
|
||||
} else if (type == RSMA_EXEC_COMMIT) {
|
||||
while (pIter) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
|
||||
if (taosQueueItemSize(pInfo->iQueue)) {
|
||||
|
@ -1788,7 +1836,6 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) {
|
|||
ASSERT(0);
|
||||
}
|
||||
atomic_store_64(&pRSmaStat->qBufSize, 0);
|
||||
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
|
||||
|
||||
int32_t qSize = taosArrayGetSize(pSubmitQArr);
|
||||
for (int32_t i = 0; i < qSize; ++i) {
|
||||
|
@ -1808,22 +1855,31 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) {
|
|||
|
||||
int32_t size = taosArrayGetSize(pSubmitArr);
|
||||
if (size > 0) {
|
||||
if (type == 0 || type == 1) {
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo;
|
||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo;
|
||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
||||
taosArrayClear(pSubmitArr);
|
||||
}
|
||||
}
|
||||
|
||||
// step 2: rsma fetch - consume data in buffer queue for suids triggered by timer
|
||||
if (taosHashGetSize(RSMA_FETCH_HASH(pRSmaStat)) <= 0) {
|
||||
goto _end;
|
||||
}
|
||||
pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), NULL);
|
||||
if (pIter) {
|
||||
tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr);
|
||||
while ((pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), pIter))) {
|
||||
tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
taosArrayDestroy(pSubmitArr);
|
||||
taosArrayDestroy(pSubmitQArr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1842,23 +1898,23 @@ _err:
|
|||
*/
|
||||
int32_t smaProcessExec(SSma *pSma, void *pMsg) {
|
||||
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
|
||||
SRSmaStat *pRsmaStat = SMA_RSMA_STAT(pSma);
|
||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||
|
||||
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
|
||||
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
|
||||
goto _err;
|
||||
}
|
||||
smaDebug("vgId:%d, begin to process rsma exec msg by thread:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
if (tdRSmaProcessExecImpl(pSma, 0) < 0) {
|
||||
smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pRsmaStat->execStat = 0;
|
||||
smaDebug("vgId:%d, success to process rsma exec msg by thead:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
pRsmaStat->execStat = 0;
|
||||
smaError("vgId:%d, failed to process rsma fetch msg by thread:%p since %s", SMA_VID(pSma),
|
||||
(void *)taosGetSelfPthreadId(), terrstr());
|
||||
atomic_store_8(&pRSmaStat->execStat, 0);
|
||||
smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(),
|
||||
terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue