enh: rsma batch process

This commit is contained in:
Cary Xu 2022-08-16 20:22:45 +08:00
parent 7216384d2f
commit d170adf214
7 changed files with 169 additions and 68 deletions

View File

@ -240,7 +240,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessExecImpl(SSma *pSma); int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);

View File

@ -65,6 +65,7 @@ struct SVBufPool {
SVBufPool* next; SVBufPool* next;
SVnode* pVnode; SVnode* pVnode;
volatile int32_t nRef; volatile int32_t nRef;
TdThreadSpinlock lock;
int64_t size; int64_t size;
uint8_t* ptr; uint8_t* ptr;
SVBufPoolNode* pTail; SVBufPoolNode* pTail;

View File

@ -297,10 +297,9 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
} }
/** /**
* @brief Rsma async commit implementation * @brief Rsma async commit implementation(only do some necessary light weighted task)
* 1) set rsma stat TASK_TRIGGER_STAT_PAUSED * 1) set rsma stat TASK_TRIGGER_STAT_PAUSED
* 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) * 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
* 3)
* *
* @param pSma * @param pSma
* @return int32_t * @return int32_t
@ -334,12 +333,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
} }
} }
// step 3: consume the SubmitReq in buffer // step 3: swap queue/qall and iQueue/iQal
if (tdRSmaProcessExecImpl(pSma) < 0) {
return TSDB_CODE_FAILED;
}
// step 4: swap rsmaInfoHash and iRsmaInfoHash
// lock // lock
taosWLockLatch(SMA_ENV_LOCK(pEnv)); taosWLockLatch(SMA_ENV_LOCK(pEnv));
@ -379,9 +373,32 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// perform persist task for qTaskInfo
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
// step 1: consume the SubmitReq in buffer
int32_t nLoops = 0;
smaDebug("vgId:%d start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
while (pRSmaStat->execStat == 1) {
taosMsleep(15);
if ((++nLoops & 63) == 0) {
smaWarn("vgId:%d 1s waited for rsma exec stat = 0, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
sched_yield();
}
}
pRSmaStat->execStat = 1;
smaDebug("vgId:%d end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaProcessExecImpl(pSma, 1) < 0) {
pRSmaStat->execStat = 0;
return TSDB_CODE_FAILED;
}
// step 2: perform persist task for qTaskInfo operator
if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
pRSmaStat->execStat = 0;
return TSDB_CODE_FAILED;
}
pRSmaStat->execStat = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -15,9 +15,9 @@
#include "sma.h" #include "sma.h"
#define RSMA_QTASKINFO_BUFSIZE 32768 #define RSMA_QTASKINFO_BUFSIZE (32768)
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKEXEC_BUFSIZ 10 * 1048576 // 8 MB #define RSMA_QTASKEXEC_BUFSIZ (1048576)
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
.inited = 0, .inited = 0,
@ -35,7 +35,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUi
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx); int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
tb_uid_t suid, int8_t level); int8_t type, int8_t level);
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
@ -600,17 +600,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
return 0; return 0;
} }
static void tdDestroySDataBlockArray(SArray *pArray) {
// TODO
#if 0
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SSDataBlock *pDataBlock = taosArrayGet(pArray, i);
blockDestroyInner(pDataBlock);
}
#endif
taosArrayDestroy(pArray);
}
/** /**
* @brief retention of rsma1/rsma2 * @brief retention of rsma1/rsma2
* *
@ -668,8 +657,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
} else { } else {
smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
} }
#if 0
#if 1
char flag[10] = {0}; char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level); snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowDataBlocks(pResList, flag); blockDebugShowDataBlocks(pResList, flag);
@ -731,11 +719,45 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
taosWriteQitem(pInfo->queue, qItem); taosWriteQitem(pInfo->queue, qItem);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
atomic_fetch_add_64(&pRSmaStat->qBufSize, taosQueueMemorySize(pInfo->queue)); int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen);
// smoothing consume
int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZ;
if (n > 1) {
if (n > 10) {
n = 10;
}
taosMsleep(n << 4);
if (n > 2) {
smaWarn("vgId:%d pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
} else {
smaDebug("vgId:%d pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
SSubmitMsgIter msgIter = {0};
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
while (true) {
SSubmitBlk *pBlock = NULL;
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
smaDebug("vgId:%d numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
}
}
return 0;
}
/** /**
* @brief sync mode * @brief sync mode
* *
@ -744,32 +766,42 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
* @param msgSize * @param msgSize
* @param inputType * @param inputType
* @param pInfo * @param pInfo
* @param suid * @param type
* @param level * @param level
* @return int32_t * @return int32_t
*/ */
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
tb_uid_t suid, int8_t level) { int8_t type, int8_t level) {
int32_t idx = level - 1; int32_t idx = level - 1;
if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); void *qTaskInfo = (type == 0) ? RSMA_INFO_QTASK(pInfo, idx) : RSMA_INFO_IQTASK(pInfo, idx);
if (!qTaskInfo) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
pInfo->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pInfo->pTSchema) { if (!pInfo->pTSchema) {
smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
RSMA_INFO_QTASK(pInfo, idx), suid); RSMA_INFO_QTASK(pInfo, idx), pInfo->suid);
if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, msgSize, inputType) < 0) { // INPUT__DATA_SUBMIT #if 0
for (int32_t i = 0; i < msgSize; ++i) {
SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *));
smaDebug("vgId:%d [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version);
tdRsmaPrintSubmitReq(pSma, pReq);
}
#endif
if (qSetMultiStreamInput(qTaskInfo, pMsg, msgSize, inputType) < 0) {
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid); tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
if (smaMgmt.tmrHandle) { if (smaMgmt.tmrHandle) {
@ -858,6 +890,8 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} else {
ASSERT(0);
} }
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
@ -896,9 +930,18 @@ static int32_t tdRSmaExecCheck(SSma *pSma) {
int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize); int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize);
if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) { if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) {
if (bufSize > RSMA_QTASKEXEC_BUFSIZ) {
smaDebug("vgId:%d bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma),
bufSize);
} else {
smaDebug("vgId:%d bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize,
RSMA_QTASKEXEC_BUFSIZ);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
smaDebug("vgId:%d bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize);
pRsmaStat->execStat = 1; pRsmaStat->execStat = 1;
SRSmaExecMsg fetchMsg; SRSmaExecMsg fetchMsg;
@ -1633,6 +1676,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
goto _err; goto _err;
} }
#if 0
pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid); pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid);
if (!pInfo) { if (!pInfo) {
if (terrno == TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_SUCCESS) {
@ -1657,12 +1701,13 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
tdCleanupStreamInputDataBlock(taskInfo); tdCleanupStreamInputDataBlock(taskInfo);
tdReleaseRSmaInfo(pSma, pInfo); tdReleaseRSmaInfo(pSma, pInfo);
#endif
tDecoderClear(&decoder); tDecoderClear(&decoder);
smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid, smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid,
req.level); req.level);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
tdReleaseRSmaInfo(pSma, pInfo); // tdReleaseRSmaInfo(pSma, pInfo);
tDecoderClear(&decoder); tDecoderClear(&decoder);
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -1674,7 +1719,14 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) {
} }
} }
int32_t tdRSmaProcessExecImpl(SSma *pSma) { /**
* @brief
*
* @param pSma
* @param type 0 triggered when buffer overflow, 1 triggered by commit
* @return int32_t
*/
int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SHashObj *infoHash = NULL; SHashObj *infoHash = NULL;
@ -1686,12 +1738,14 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) {
goto _err; goto _err;
} }
if (type == 0) {
taosRLockLatch(SMA_ENV_LOCK(pEnv)); taosRLockLatch(SMA_ENV_LOCK(pEnv));
if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZ) { if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZ) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
}
if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) { if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1706,6 +1760,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) {
SRSmaExecQItem qItem = {0}; SRSmaExecQItem qItem = {0};
taosWLockLatch(SMA_ENV_LOCK(pEnv)); taosWLockLatch(SMA_ENV_LOCK(pEnv));
void *pIter = taosHashIterate(infoHash, NULL); void *pIter = taosHashIterate(infoHash, NULL);
if (type == 0) {
while (pIter) { while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
if (taosQueueItemSize(pInfo->queue)) { if (taosQueueItemSize(pInfo->queue)) {
@ -1717,7 +1772,21 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) {
ASSERT(taosQueueItemSize(pInfo->queue) == 0); ASSERT(taosQueueItemSize(pInfo->queue) == 0);
pIter = taosHashIterate(infoHash, pIter); pIter = taosHashIterate(infoHash, pIter);
} }
} else if (type == 1) {
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
if (taosQueueItemSize(pInfo->iQueue)) {
taosReadAllQitems(pInfo->iQueue, pInfo->iQall);
qItem.qall = &pInfo->iQall;
qItem.pRSmaInfo = pIter;
taosArrayPush(pSubmitQArr, &qItem);
}
ASSERT(taosQueueItemSize(pInfo->iQueue) == 0);
pIter = taosHashIterate(infoHash, pIter);
}
} else {
ASSERT(0);
}
atomic_store_64(&pRSmaStat->qBufSize, 0); atomic_store_64(&pRSmaStat->qBufSize, 0);
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
@ -1739,13 +1808,17 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) {
int32_t size = taosArrayGetSize(pSubmitArr); int32_t size = taosArrayGetSize(pSubmitArr);
if (size > 0) { if (size > 0) {
if (type == 0 || type == 1) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo;
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, pInfo->suid, i) < 0) { if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr); tdFreeRSmaSubmitItems(pSubmitArr);
goto _err; goto _err;
} }
} }
} else {
ASSERT(0);
}
tdFreeRSmaSubmitItems(pSubmitArr); tdFreeRSmaSubmitItems(pSubmitArr);
taosArrayClear(pSubmitArr); taosArrayClear(pSubmitArr);
} }
@ -1775,16 +1848,17 @@ int32_t smaProcessExec(SSma *pSma, void *pMsg) {
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
goto _err; goto _err;
} }
smaDebug("vgId:%d, begin to process rsma exec msg by thread:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaProcessExecImpl(pSma) < 0) { if (tdRSmaProcessExecImpl(pSma, 0) < 0) {
goto _err; goto _err;
} }
pRsmaStat->execStat = 0; pRsmaStat->execStat = 0;
smaWarn("vgId:%d, success to process rsma exec msg", SMA_VID(pSma)); smaDebug("vgId:%d, success to process rsma exec msg by thead:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
pRsmaStat->execStat = 0; pRsmaStat->execStat = 0;
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, failed to process rsma fetch msg by thread:%p since %s", SMA_VID(pSma),
(void *)taosGetSelfPthreadId(), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }

View File

@ -78,7 +78,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) {
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
SVBufPoolNode *pNode; SVBufPoolNode *pNode;
void *p; void *p;
taosThreadSpinLock(&pPool->lock);
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
// allocate from the anchor node // allocate from the anchor node
p = pPool->ptr; p = pPool->ptr;
@ -89,6 +89,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
pNode = taosMemoryMalloc(sizeof(*pNode) + size); pNode = taosMemoryMalloc(sizeof(*pNode) + size);
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosThreadSpinUnlock(&pPool->lock);
return NULL; return NULL;
} }
@ -101,7 +102,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
pPool->size = pPool->size + sizeof(*pNode) + size; pPool->size = pPool->size + sizeof(*pNode) + size;
} }
taosThreadSpinUnlock(&pPool->lock);
return p; return p;
} }
@ -129,6 +130,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
return -1; return -1;
} }
if (taosThreadSpinInit(&pPool->lock, 0) != 0) {
taosMemoryFree(pPool);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pPool->next = NULL; pPool->next = NULL;
pPool->pVnode = pVnode; pPool->pVnode = pVnode;
pPool->nRef = 0; pPool->nRef = 0;
@ -145,6 +152,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
static int vnodeBufPoolDestroy(SVBufPool *pPool) { static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset(pPool); vnodeBufPoolReset(pPool);
taosThreadSpinDestroy(&pPool->lock);
taosMemoryFree(pPool); taosMemoryFree(pPool);
return 0; return 0;
} }

View File

@ -220,9 +220,6 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied); pVnode->state.applied);
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
pVnode->state.commitTerm = pVnode->state.applyTerm; pVnode->state.commitTerm = pVnode->state.applyTerm;
// save info // save info
@ -248,7 +245,7 @@ int vnodeCommit(SVnode *pVnode) {
} }
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
smaAsyncCommit(pVnode->pSma); smaAsyncCommit(pVnode->pSma); // would write L2/L3 data into BufPool
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0); ASSERT(0);
@ -269,6 +266,9 @@ int vnodeCommit(SVnode *pVnode) {
} }
} }
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
if (tqCommit(pVnode->pTq) < 0) { if (tqCommit(pVnode->pTq) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;

View File

@ -1174,6 +1174,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
@ -1183,7 +1184,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
isClosed = isCloseWindow(&win, &pInfo->twAggSup); isClosed = isCloseWindow(&win, &pInfo->twAggSup);
} }
bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
// must check update info first. // must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) && bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) &&