enh: coverity scan for sma
This commit is contained in:
parent
eb301a1d61
commit
fabd96deef
|
@ -159,9 +159,11 @@ struct SRSmaInfo {
|
|||
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
|
||||
STaosQueue *queue; // buffer queue of SubmitReq
|
||||
STaosQall *qall; // buffer qall of SubmitReq
|
||||
#if 0
|
||||
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t
|
||||
STaosQueue *iQueue; // immutable buffer queue of SubmitReq
|
||||
STaosQall *iQall; // immutable buffer qall of SubmitReq
|
||||
#endif
|
||||
};
|
||||
|
||||
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
|
||||
|
@ -209,6 +211,14 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
|
|||
smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdSmaLoopsCheck(int32_t *pCnt, int32_t limit) {
|
||||
++(*pCnt);
|
||||
if (*pCnt > limit) {
|
||||
sched_yield();
|
||||
*pCnt = 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t smaPreClose(SSma *pSma);
|
||||
|
||||
// rsma
|
||||
|
|
|
@ -150,11 +150,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
|
|||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||
if (isCommit) {
|
||||
while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
|
||||
++nLoops;
|
||||
if (nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
tdSmaLoopsCheck(&nLoops, 1000);
|
||||
}
|
||||
|
||||
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
|
||||
|
@ -173,11 +169,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
|
|||
} else {
|
||||
smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit);
|
||||
}
|
||||
++nLoops;
|
||||
if (nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
tdSmaLoopsCheck(&nLoops, 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -189,11 +181,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
|
|||
(void *)taosGetSelfPthreadId());
|
||||
nLoops = 0;
|
||||
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
|
||||
++nLoops;
|
||||
if (nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
tdSmaLoopsCheck(&nLoops, 1000);
|
||||
}
|
||||
|
||||
if (!isCommit) goto _exit;
|
||||
|
|
|
@ -278,11 +278,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
|||
} else {
|
||||
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
|
||||
}
|
||||
++nLoops;
|
||||
if (nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
tdSmaLoopsCheck(&nLoops, 1000);
|
||||
}
|
||||
|
||||
// step 3:
|
||||
|
|
|
@ -99,27 +99,38 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
|
|||
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
|
||||
pInfo->suid, i + 1);
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (pInfo->iTaskInfo[i]) {
|
||||
tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
|
||||
} else {
|
||||
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
|
||||
SMA_VID(pSma), pInfo->suid, i + 1);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
if (isDeepFree) {
|
||||
taosMemoryFreeClear(pInfo->pTSchema);
|
||||
}
|
||||
|
||||
if (isDeepFree) {
|
||||
if (pInfo->queue) taosCloseQueue(pInfo->queue);
|
||||
if (pInfo->qall) taosFreeQall(pInfo->qall);
|
||||
if (pInfo->iQueue) taosCloseQueue(pInfo->iQueue);
|
||||
if (pInfo->iQall) taosFreeQall(pInfo->iQall);
|
||||
pInfo->queue = NULL;
|
||||
pInfo->qall = NULL;
|
||||
pInfo->iQueue = NULL;
|
||||
pInfo->iQall = NULL;
|
||||
if (pInfo->queue) {
|
||||
taosCloseQueue(pInfo->queue);
|
||||
pInfo->queue = NULL;
|
||||
}
|
||||
if (pInfo->qall) {
|
||||
taosFreeQall(pInfo->qall);
|
||||
pInfo->qall = NULL;
|
||||
}
|
||||
#if 0
|
||||
if (pInfo->iQueue) {
|
||||
taosCloseQueue(pInfo->iQueue);
|
||||
pInfo->iQueue = NULL;
|
||||
}
|
||||
if (pInfo->iQall) {
|
||||
taosFreeQall(pInfo->iQall);
|
||||
pInfo->iQall = NULL;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
taosMemoryFree(pInfo);
|
||||
|
@ -376,13 +387,14 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
if (!(pRSmaInfo->qall = taosAllocateQall())) {
|
||||
goto _err;
|
||||
}
|
||||
#if 0
|
||||
if (!(pRSmaInfo->iQueue = taosOpenQueue())) {
|
||||
goto _err;
|
||||
}
|
||||
if (!(pRSmaInfo->iQall = taosAllocateQall())) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
#endif
|
||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -803,7 +815,11 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
|||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||
ERsmaExecType type, int8_t level) {
|
||||
int32_t idx = level - 1;
|
||||
#if 0
|
||||
void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
|
||||
#else
|
||||
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
|
||||
#endif
|
||||
if (!qTaskInfo) {
|
||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
|
||||
pInfo->suid);
|
||||
|
@ -836,6 +852,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param,
|
||||
tb_uid_t suid, int8_t idx) {
|
||||
int32_t code = 0;
|
||||
|
@ -884,6 +901,7 @@ _exit:
|
|||
}
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Clone qTaskInfo of SRSmaInfo
|
||||
|
@ -920,6 +938,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
|
|||
|
||||
if (TABLE_IS_ROLLUP(mr.me.flags)) {
|
||||
param = &mr.me.stbEntry.rsmaParam;
|
||||
#if 0
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (!pInfo->iTaskInfo[i]) {
|
||||
continue;
|
||||
|
@ -928,6 +947,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
|
||||
#endif
|
||||
} else {
|
||||
code = TSDB_CODE_RSMA_INVALID_SCHEMA;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
|
@ -10,17 +10,19 @@ sql use d0
|
|||
|
||||
print =============== create super table and register rsma
|
||||
sql create table if not exists stb (ts timestamp, c1 int, c2 float) tags (city binary(20),district binary(20)) rollup(max) max_delay 5s,5s watermark 2s,3s;
|
||||
sql create table if not exists stb1 (ts timestamp, c1 int, c2 float) tags (city binary(20),district binary(20)) rollup(max) max_delay 5s,5s watermark 2s,3s;
|
||||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== create child table
|
||||
sql create table ct1 using stb tags("BeiJing", "ChaoYang");
|
||||
sql create table ct_1 using stb1 tags("BeiJing", "ChaoYang");
|
||||
|
||||
sql show tables
|
||||
if $rows != 1 then
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -28,6 +30,9 @@ print =============== insert data and trigger rollup
|
|||
sql insert into ct1 values(now, 10, 10.0);
|
||||
sql insert into ct1 values(now+1s, 1, 1.0);
|
||||
sql insert into ct1 values(now+2s, 100, 100.0);
|
||||
sql insert into ct_1 values(now, 10, 10.0);
|
||||
sql insert into ct_1 values(now+1s, 1, 1.0);
|
||||
sql insert into ct_1 values(now+2s, 100, 100.0);
|
||||
|
||||
print =============== wait maxdelay 5+2 seconds for results
|
||||
sleep 7000
|
||||
|
@ -41,6 +46,20 @@ if $rows > 2 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
print retention level 2 file result $data01 != 100 or 10
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
sql select * from ct_1;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
if $rows > 2 then
|
||||
print retention level 2 file rows $rows > 2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
|
@ -65,6 +84,21 @@ if $data01 != 100 then
|
|||
endi
|
||||
endi
|
||||
|
||||
sql select * from ct_1 where ts > now-8d;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
if $rows > 2 then
|
||||
print retention level 1 file rows $rows > 2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
print retention level 1 file result $data01 != 100 or 10
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
print =============== select * from retention level 0 from memory
|
||||
sql select * from ct1 where ts > now-3d;
|
||||
print $data00 $data01 $data02
|
||||
|
@ -81,6 +115,21 @@ if $data01 != 10 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select * from ct_1 where ts > now-3d;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
|
||||
if $rows < 1 then
|
||||
print retention level 0 file rows $rows < 1
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 10 then
|
||||
print retention level 0 file result $data01 != 10
|
||||
return -1
|
||||
endi
|
||||
|
||||
#===================================================================
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
@ -97,6 +146,22 @@ if $rows > 2 then
|
|||
endi
|
||||
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
print retention level 2 file result $data01 != 100 or 10
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
sql select * from ct_1;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
if $rows > 2 then
|
||||
print retention level 2 file rows $rows > 2
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
print retention level 2 file result $data01 != 100 or 10
|
||||
|
@ -120,6 +185,21 @@ if $data01 != 100 then
|
|||
endi
|
||||
endi
|
||||
|
||||
sql select * from ct_1 where ts > now-8d;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
if $rows > 2 then
|
||||
print retention level 1 file rows $rows > 2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
print retention level 1 file result $data01 != 100 or 10
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
print =============== select * from retention level 0 from memory after reboot
|
||||
sql select * from ct1 where ts > now-3d;
|
||||
print $data00 $data01 $data02
|
||||
|
@ -136,6 +216,21 @@ if $data01 != 10 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select * from ct_1 where ts > now-3d;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
|
||||
if $rows < 1 then
|
||||
print retention level 0 file rows $rows < 1
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 10 then
|
||||
print retention level 0 file result $data01 != 10
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
#==================== flush database to trigger commit data to file
|
||||
sql flush database d0;
|
||||
|
@ -158,6 +253,21 @@ if $data01 != 100 then
|
|||
endi
|
||||
endi
|
||||
|
||||
sql select * from ct_1;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
if $rows > 2 then
|
||||
print retention level 2 file rows $rows > 2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
print retention level 2 file result $data01 != 100 or 10
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
print =============== select * from retention level 1 from file
|
||||
sql select * from ct1 where ts > now-8d;
|
||||
print $data00 $data01 $data02
|
||||
|
@ -174,6 +284,21 @@ if $data01 != 100 then
|
|||
endi
|
||||
endi
|
||||
|
||||
sql select * from ct_1 where ts > now-8d;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
if $rows > 2 then
|
||||
print retention level 1 file rows $rows > 2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
print retention level 1 file result $data01 != 100 or 10
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
print =============== select * from retention level 0 from file
|
||||
sql select * from ct1 where ts > now-3d;
|
||||
print $data00 $data01 $data02
|
||||
|
@ -189,9 +314,25 @@ if $data01 != 10 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select * from ct_1 where ts > now-3d;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
if $rows < 1 then
|
||||
print retention level 0 file rows $rows < 1
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 10 then
|
||||
print retention level 0 file result $data01 != 10
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== insert after rsma qtaskinfo recovery
|
||||
sql insert into ct1 values(now, 50, 500.0);
|
||||
sql insert into ct1 values(now+1s, 40, 40.0);
|
||||
sql insert into ct_1 values(now, 50, 500.0);
|
||||
sql insert into ct_1 values(now+1s, 40, 40.0);
|
||||
|
||||
print =============== wait maxdelay 5+2 seconds for results
|
||||
sleep 7000
|
||||
|
@ -219,6 +360,28 @@ if $data02 != 500.00000 then
|
|||
endi
|
||||
endi
|
||||
|
||||
sql select * from ct_1;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
if $rows > 2 then
|
||||
print retention level 2 file/mem rows $rows > 2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
print retention level 2 file/mem result $data01 != 100 or 10
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data02 != 500.00000 then
|
||||
if $data02 != 100.00000 then
|
||||
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery
|
||||
sql select * from ct1 where ts > now-8d;
|
||||
print $data00 $data01 $data02
|
||||
|
@ -242,6 +405,28 @@ if $data02 != 500.00000 then
|
|||
endi
|
||||
endi
|
||||
|
||||
sql select * from ct_1 where ts > now-8d;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
if $rows > 2 then
|
||||
print retention level 1 file/mem rows $rows > 2
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100 then
|
||||
if $data01 != 10 then
|
||||
print retention level 1 file/mem result $data01 != 100 or 10
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data02 != 500.00000 then
|
||||
if $data02 != 100.00000 then
|
||||
print retention level 1 file/mem result $data02 != 500.00000 or 100.00000
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
|
||||
print =============== select * from retention level 0 from file and memory after rsma qtaskinfo recovery
|
||||
sql select * from ct1 where ts > now-3d;
|
||||
|
@ -292,6 +477,61 @@ if $data42 != 40.00000 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select * from ct_1 where ts > now-3d;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
print $data30 $data31 $data32
|
||||
print $data40 $data41 $data42
|
||||
|
||||
if $rows < 1 then
|
||||
print retention level 0 file/mem rows $rows < 1
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 10 then
|
||||
print retention level 0 file/mem result $data01 != 10
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 1 then
|
||||
print retention level 0 file/mem result $data11 != 1
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data21 != 100 then
|
||||
print retention level 0 file/mem result $data21 != 100
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data31 != 50 then
|
||||
print retention level 0 file/mem result $data31 != 50
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data32 != 500.00000 then
|
||||
print retention level 0 file/mem result $data32 != 500.00000
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
if $data41 != 40 then
|
||||
print retention level 0 file/mem result $data41 != 40
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data42 != 40.00000 then
|
||||
print retention level 0 file/mem result $data42 != 40.00000
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== drop stb1
|
||||
sql drop table stb1;
|
||||
sql flush database d0;
|
||||
|
||||
print =============== select * from retention level 0 from file and memory after rsma qtaskinfo recovery
|
||||
sql_error select * from ct_1 where ts > now-3d;
|
||||
sql_error select * from ct_1 where ts > now-8d;
|
||||
sql_error select * from ct_1;
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue