enh: rsma batch process
This commit is contained in:
parent
59739e0cb7
commit
a9fcc12c33
|
@ -87,10 +87,14 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
while (pVnode->refCount > 0) taosMsleep(10);
|
while (pVnode->refCount > 0) taosMsleep(10);
|
||||||
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);
|
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);
|
||||||
|
|
||||||
|
|
||||||
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
|
||||||
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
|
||||||
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
|
||||||
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pQueryQ)) {
|
||||||
|
taosMsleep(10);
|
||||||
|
dInfo("prop:vgId:%d, query queue size is %d", pVnode->vgId, taosQueueItemSize(pVnode->pQueryQ));
|
||||||
|
}
|
||||||
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
||||||
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
||||||
dTrace("vgId:%d, vnode queue is empty", pVnode->vgId);
|
dTrace("vgId:%d, vnode queue is empty", pVnode->vgId);
|
||||||
|
|
|
@ -32,7 +32,8 @@ extern "C" {
|
||||||
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
#define RSMA_TASK_INFO_HASH_SLOT 8
|
#define RSMA_TASK_INFO_HASH_SLOT (8)
|
||||||
|
#define RSMA_EXECUTOR_MAX (4)
|
||||||
|
|
||||||
typedef struct SSmaEnv SSmaEnv;
|
typedef struct SSmaEnv SSmaEnv;
|
||||||
typedef struct SSmaStat SSmaStat;
|
typedef struct SSmaStat SSmaStat;
|
||||||
|
|
|
@ -189,6 +189,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
||||||
int32_t smaInit();
|
int32_t smaInit();
|
||||||
void smaCleanUp();
|
void smaCleanUp();
|
||||||
int32_t smaOpen(SVnode* pVnode);
|
int32_t smaOpen(SVnode* pVnode);
|
||||||
|
int32_t smaPreClose(SSma* pSma);
|
||||||
int32_t smaClose(SSma* pSma);
|
int32_t smaClose(SSma* pSma);
|
||||||
int32_t smaBegin(SSma* pSma);
|
int32_t smaBegin(SSma* pSma);
|
||||||
int32_t smaSyncPreCommit(SSma* pSma);
|
int32_t smaSyncPreCommit(SSma* pSma);
|
||||||
|
@ -322,10 +323,12 @@ struct SVnode {
|
||||||
TdThreadMutex lock;
|
TdThreadMutex lock;
|
||||||
bool blocked;
|
bool blocked;
|
||||||
bool restored;
|
bool restored;
|
||||||
|
bool inClose;
|
||||||
tsem_t syncSem;
|
tsem_t syncSem;
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
|
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
|
||||||
|
|
||||||
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
||||||
|
|
|
@ -146,6 +146,17 @@ int32_t smaClose(SSma *pSma) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t smaPreClose(SSma *pSma) {
|
||||||
|
if (pSma && VND_IS_RSMA(pSma->pVnode)) {
|
||||||
|
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
||||||
|
for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) {
|
||||||
|
tsem_post(&(pRSmaStat->notEmpty));
|
||||||
|
}
|
||||||
|
smaInfo("prop:vgId:%d post notEmtpy", SMA_VID(pSma));
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief rsma env restore
|
* @brief rsma env restore
|
||||||
*
|
*
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
||||||
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
||||||
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
||||||
#define RSMA_EXECUTOR_MAX (4) // cnt
|
|
||||||
#define RSMA_FETCH_DELAY_MAX (1800000) // ms
|
#define RSMA_FETCH_DELAY_MAX (1800000) // ms
|
||||||
#define RSMA_FETCH_SKIP_MAX (1000) // cnt
|
#define RSMA_FETCH_SKIP_MAX (1000) // cnt
|
||||||
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms
|
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms
|
||||||
|
@ -671,7 +670,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
|
smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
|
||||||
}
|
}
|
||||||
#if 1
|
#if 0
|
||||||
char flag[10] = {0};
|
char flag[10] = {0};
|
||||||
snprintf(flag, 10, "level %" PRIi8, pItem->level);
|
snprintf(flag, 10, "level %" PRIi8, pItem->level);
|
||||||
blockDebugShowDataBlocks(pResList, flag);
|
blockDebugShowDataBlocks(pResList, flag);
|
||||||
|
@ -1736,6 +1735,7 @@ _err:
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
|
SVnode *pVnode = pSma->pVnode;
|
||||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||||
SHashObj *infoHash = NULL;
|
SHashObj *infoHash = NULL;
|
||||||
|
@ -1753,18 +1753,9 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t nIdle = 0;
|
bool isBusy = false;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (++nIdle > 100) {
|
isBusy = false;
|
||||||
if (atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1) > 1) {
|
|
||||||
// free the exec thread if without SubmitReq
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
// keep at least 1 exec thread only if without SubmitReq in case of no query thread to use when busy again
|
|
||||||
atomic_add_fetch_8(&pRSmaStat->nExecutor, 1);
|
|
||||||
nIdle = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// step 1: rsma exec - consume data in buffer queue for all suids
|
// step 1: rsma exec - consume data in buffer queue for all suids
|
||||||
if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) {
|
if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) {
|
||||||
void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock
|
void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock
|
||||||
|
@ -1785,7 +1776,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
if (qallItemSize > 0) {
|
if (qallItemSize > 0) {
|
||||||
// subtract the item size after the task finished, commit should wait for all items be consumed
|
// subtract the item size after the task finished, commit should wait for all items be consumed
|
||||||
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
||||||
nIdle = 0;
|
isBusy = true;
|
||||||
}
|
}
|
||||||
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
|
ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0));
|
||||||
}
|
}
|
||||||
|
@ -1826,8 +1817,25 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
smaInfo("prop:vgId:%d loop end check", SMA_VID(pSma));
|
||||||
if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
|
if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
|
||||||
|
if (pVnode->inClose) {
|
||||||
|
smaInfo("prop:vgId:%d loop end check - inClose and break", SMA_VID(pSma));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
smaInfo("prop:vgId:%d loop end check - wait for notEmpty", SMA_VID(pSma));
|
||||||
tsem_wait(&pRSmaStat->notEmpty);
|
tsem_wait(&pRSmaStat->notEmpty);
|
||||||
|
smaInfo("prop:vgId:%d loop end check - received notEmpty", SMA_VID(pSma));
|
||||||
|
if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
|
||||||
|
smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose,
|
||||||
|
atomic_load_64(&pRSmaStat->nBufItems));
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
smaInfo("prop:vgId:%d loop end check - continue - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma),
|
||||||
|
pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
smaInfo("prop:vgId:%d loop end check - continue to run", SMA_VID(pSma));
|
||||||
}
|
}
|
||||||
} // end of while(true)
|
} // end of while(true)
|
||||||
|
|
||||||
|
|
|
@ -87,6 +87,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
pVnode->msgCb = msgCb;
|
pVnode->msgCb = msgCb;
|
||||||
taosThreadMutexInit(&pVnode->lock, NULL);
|
taosThreadMutexInit(&pVnode->lock, NULL);
|
||||||
pVnode->blocked = false;
|
pVnode->blocked = false;
|
||||||
|
pVnode->inClose = false;
|
||||||
|
|
||||||
tsem_init(&pVnode->syncSem, 0, 0);
|
tsem_init(&pVnode->syncSem, 0, 0);
|
||||||
tsem_init(&(pVnode->canCommit), 0, 1);
|
tsem_init(&(pVnode->canCommit), 0, 1);
|
||||||
|
@ -181,6 +182,8 @@ _err:
|
||||||
void vnodePreClose(SVnode *pVnode) {
|
void vnodePreClose(SVnode *pVnode) {
|
||||||
if (pVnode) {
|
if (pVnode) {
|
||||||
syncLeaderTransfer(pVnode->sync);
|
syncLeaderTransfer(pVnode->sync);
|
||||||
|
pVnode->inClose = true;
|
||||||
|
smaPreClose(pVnode->pSma);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ sleep 7000
|
||||||
print =============== select * from retention level 2 from memory
|
print =============== select * from retention level 2 from memory
|
||||||
sql select * from ct1;
|
sql select * from ct1;
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
if $rows > 2 then
|
if $rows > 2 then
|
||||||
print retention level 2 file rows $rows > 2
|
print retention level 2 file rows $rows > 2
|
||||||
return -1
|
return -1
|
||||||
|
@ -51,6 +52,7 @@ endi
|
||||||
print =============== select * from retention level 1 from memory
|
print =============== select * from retention level 1 from memory
|
||||||
sql select * from ct1 where ts > now-8d;
|
sql select * from ct1 where ts > now-8d;
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
if $rows > 2 then
|
if $rows > 2 then
|
||||||
print retention level 1 file rows $rows > 2
|
print retention level 1 file rows $rows > 2
|
||||||
return -1
|
return -1
|
||||||
|
@ -89,6 +91,7 @@ system sh/exec.sh -n dnode1 -s start
|
||||||
print =============== select * from retention level 2 from file
|
print =============== select * from retention level 2 from file
|
||||||
sql select * from ct1;
|
sql select * from ct1;
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
if $rows > 2 then
|
if $rows > 2 then
|
||||||
print retention level 2 file rows $rows > 2
|
print retention level 2 file rows $rows > 2
|
||||||
return -1
|
return -1
|
||||||
|
@ -104,6 +107,7 @@ endi
|
||||||
print =============== select * from retention level 1 from file
|
print =============== select * from retention level 1 from file
|
||||||
sql select * from ct1 where ts > now-8d;
|
sql select * from ct1 where ts > now-8d;
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
if $rows > 2 then
|
if $rows > 2 then
|
||||||
print retention level 1 file rows $rows > 2
|
print retention level 1 file rows $rows > 2
|
||||||
return -1
|
return -1
|
||||||
|
@ -141,6 +145,7 @@ sleep 7000
|
||||||
print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery
|
print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery
|
||||||
sql select * from ct1;
|
sql select * from ct1;
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
if $rows > 2 then
|
if $rows > 2 then
|
||||||
print retention level 2 file/mem rows $rows > 2
|
print retention level 2 file/mem rows $rows > 2
|
||||||
return -1
|
return -1
|
||||||
|
@ -163,6 +168,7 @@ endi
|
||||||
print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery
|
print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery
|
||||||
sql select * from ct1 where ts > now-8d;
|
sql select * from ct1 where ts > now-8d;
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
if $rows > 2 then
|
if $rows > 2 then
|
||||||
print retention level 1 file/mem rows $rows > 2
|
print retention level 1 file/mem rows $rows > 2
|
||||||
return -1
|
return -1
|
||||||
|
|
Loading…
Reference in New Issue