ehn: remove void
This commit is contained in:
parent
b04eb61397
commit
53caf9f018
|
@ -1316,9 +1316,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d,QID:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d,QID:0x%" PRIx64,
|
||||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
code = refreshMeta(pRequest->pTscObj, pRequest);
|
code = refreshMeta(pRequest->pTscObj, pRequest);
|
||||||
if (code != 0){
|
if (code != 0) {
|
||||||
tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code,
|
tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
||||||
tstrerror(code), pRequest->requestId);
|
pRequest->requestId);
|
||||||
}
|
}
|
||||||
pRequest->prevCode = code;
|
pRequest->prevCode = code;
|
||||||
doAsyncQuery(pRequest, true);
|
doAsyncQuery(pRequest, true);
|
||||||
|
@ -1985,7 +1985,9 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
|
||||||
|
|
||||||
STscStmt2 *pStmt = (STscStmt2 *)stmt;
|
STscStmt2 *pStmt = (STscStmt2 *)stmt;
|
||||||
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
|
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
|
||||||
(void)tsem_wait(&pStmt->asyncQuerySem);
|
if (tsem_wait(&pStmt->asyncQuerySem) != 0) {
|
||||||
|
tscError("wait async query sem failed");
|
||||||
|
}
|
||||||
pStmt->semWaited = true;
|
pStmt->semWaited = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -698,7 +698,9 @@ static void* stmtBindThreadFunc(void* param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)stmtAsyncOutput(pStmt, asyncParam);
|
if (stmtAsyncOutput(pStmt, asyncParam) != 0) {
|
||||||
|
qError("stmt async output failed");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qInfo("stmt bind thread stopped");
|
qInfo("stmt bind thread stopped");
|
||||||
|
@ -822,7 +824,11 @@ TAOS_STMT2* stmtInit2(STscObj* taos, TAOS_STMT2_OPTION* pOptions) {
|
||||||
|
|
||||||
pStmt->sql.siInfo.tableColsReady = true;
|
pStmt->sql.siInfo.tableColsReady = true;
|
||||||
if (pStmt->options.asyncExecFn) {
|
if (pStmt->options.asyncExecFn) {
|
||||||
(void)tsem_init(&pStmt->asyncQuerySem, 0, 1);
|
if (tsem_init(&pStmt->asyncQuerySem, 0, 1) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
(void)stmtClose(pStmt);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pStmt->semWaited = false;
|
pStmt->semWaited = false;
|
||||||
|
|
||||||
|
@ -1603,7 +1609,9 @@ static void asyncQueryCb(void* userdata, TAOS_RES* res, int code) {
|
||||||
(void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
|
(void)stmtCleanExecInfo(pStmt, (code ? false : true), false);
|
||||||
++pStmt->sql.runTimes;
|
++pStmt->sql.runTimes;
|
||||||
|
|
||||||
(void)tsem_post(&pStmt->asyncQuerySem);
|
if (tsem_post(&pStmt->asyncQuerySem) != 0) {
|
||||||
|
tscError("failed to post asyncQuerySem");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
|
int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
|
||||||
|
@ -1710,7 +1718,9 @@ int stmtClose2(TAOS_STMT2* stmt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
|
if (pStmt->options.asyncExecFn && !pStmt->semWaited) {
|
||||||
(void)tsem_wait(&pStmt->asyncQuerySem);
|
if (tsem_wait(&pStmt->asyncQuerySem) != 0) {
|
||||||
|
tscError("failed to wait asyncQuerySem");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
|
STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64
|
||||||
|
@ -1727,7 +1737,9 @@ int stmtClose2(TAOS_STMT2* stmt) {
|
||||||
STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
|
STMT_ERR_RET(stmtCleanSQLInfo(pStmt));
|
||||||
|
|
||||||
if (pStmt->options.asyncExecFn) {
|
if (pStmt->options.asyncExecFn) {
|
||||||
(void)tsem_destroy(&pStmt->asyncQuerySem);
|
if (tsem_destroy(&pStmt->asyncQuerySem) != 0) {
|
||||||
|
tscError("failed to destroy asyncQuerySem");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFree(stmt);
|
taosMemoryFree(stmt);
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||||
SVnodeObj *pVnode = NULL;
|
SVnodeObj *pVnode = NULL;
|
||||||
|
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||||
(void)taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
diskId = pVnode->diskPrimary;
|
diskId = pVnode->diskPrimary;
|
||||||
}
|
}
|
||||||
|
@ -97,7 +97,7 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
|
||||||
SVnodeObj *pVnode = NULL;
|
SVnodeObj *pVnode = NULL;
|
||||||
|
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
||||||
(void)taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
int32_t r = taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||||
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
|
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
|
||||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
pVnode = NULL;
|
pVnode = NULL;
|
||||||
|
@ -165,7 +165,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
|
|
||||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||||
SVnodeObj *pOld = NULL;
|
SVnodeObj *pOld = NULL;
|
||||||
(void)taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
int32_t r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
|
||||||
if (pOld) {
|
if (pOld) {
|
||||||
vmFreeVnodeObj(&pOld);
|
vmFreeVnodeObj(&pOld);
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
||||||
(void)taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
|
int32_t r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "mndVgroup.h"
|
||||||
#include "audit.h"
|
#include "audit.h"
|
||||||
#include "mndArbGroup.h"
|
#include "mndArbGroup.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
|
@ -26,7 +27,6 @@
|
||||||
#include "mndTopic.h"
|
#include "mndTopic.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndVgroup.h"
|
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
|
|
||||||
#define VGROUP_VER_NUMBER 1
|
#define VGROUP_VER_NUMBER 1
|
||||||
|
@ -1670,7 +1670,9 @@ int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
|
TAOS_CHECK_GOTO(mndTransAppendPrepareLog(pTrans, pRaw), NULL, _err);
|
||||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
|
if (sdbSetRawStatus(pRaw, SDB_STATUS_CREATING) != 0) {
|
||||||
|
mError("vgId:%d, failed to set raw status at line:%d", pVg->vgId, __LINE__);
|
||||||
|
}
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
|
mError("vgId:%d, failed to set raw status since %s at line:%d", pVg->vgId, tstrerror(code), __LINE__);
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
|
|
|
@ -38,7 +38,9 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
|
||||||
streamTaskOpenAllUpstreamInput(pTask);
|
streamTaskOpenAllUpstreamInput(pTask);
|
||||||
|
|
||||||
streamTaskResetUpstreamStageInfo(pTask);
|
streamTaskResetUpstreamStageInfo(pTask);
|
||||||
(void)streamSetupScheduleTrigger(pTask);
|
if (streamSetupScheduleTrigger(pTask) != 0) {
|
||||||
|
sndError("failed to setup schedule trigger for task:%s", pTask->id.idStr);
|
||||||
|
}
|
||||||
|
|
||||||
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
|
||||||
tqSetRestoreVersionInfo(pTask);
|
tqSetRestoreVersionInfo(pTask);
|
||||||
|
@ -93,14 +95,18 @@ FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndInit(SSnode *pSnode) {
|
int32_t sndInit(SSnode *pSnode) {
|
||||||
(void)streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
|
if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS) != 0) {
|
||||||
|
sndError("failed to start all tasks");
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sndClose(SSnode *pSnode) {
|
void sndClose(SSnode *pSnode) {
|
||||||
stopRsync();
|
stopRsync();
|
||||||
streamMetaNotifyClose(pSnode->pMeta);
|
streamMetaNotifyClose(pSnode->pMeta);
|
||||||
(void)streamMetaCommit(pSnode->pMeta);
|
if (streamMetaCommit(pSnode->pMeta) != 0) {
|
||||||
|
sndError("failed to commit stream meta");
|
||||||
|
}
|
||||||
streamMetaClose(pSnode->pMeta);
|
streamMetaClose(pSnode->pMeta);
|
||||||
taosMemoryFree(pSnode);
|
taosMemoryFree(pSnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,7 +211,10 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
|
||||||
SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
|
SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
|
||||||
pRSmaStat->pSma = (SSma *)pSma;
|
pRSmaStat->pSma = (SSma *)pSma;
|
||||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
|
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
|
||||||
(void)tsem_init(&pRSmaStat->notEmpty, 0, 0);
|
if (tsem_init(&pRSmaStat->notEmpty, 0, 0) != 0) {
|
||||||
|
code = terrno;
|
||||||
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
|
}
|
||||||
if (!(pRSmaStat->blocks = taosArrayInit(1, sizeof(SSDataBlock)))) {
|
if (!(pRSmaStat->blocks = taosArrayInit(1, sizeof(SSDataBlock)))) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
|
@ -295,7 +298,10 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
||||||
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
||||||
|
|
||||||
// step 5: free pStat
|
// step 5: free pStat
|
||||||
(void)tsem_destroy(&(pStat->notEmpty));
|
if (tsem_destroy(&(pStat->notEmpty)) != 0) {
|
||||||
|
smaError("vgId:%d, failed to destroy notEmpty semaphore for rsma stat:%p since %s", SMA_VID(pSma), pRSmaStat,
|
||||||
|
tstrerror(terrno));
|
||||||
|
}
|
||||||
taosArrayDestroy(pStat->blocks);
|
taosArrayDestroy(pStat->blocks);
|
||||||
taosMemoryFreeClear(pStat);
|
taosMemoryFreeClear(pStat);
|
||||||
}
|
}
|
||||||
|
@ -399,7 +405,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
|
||||||
void *tdRSmaExecutorFunc(void *param) {
|
void *tdRSmaExecutorFunc(void *param) {
|
||||||
setThreadName("vnode-rsma");
|
setThreadName("vnode-rsma");
|
||||||
|
|
||||||
if(tdRSmaProcessExecImpl((SSma *)param, RSMA_EXEC_OVERFLOW) < 0){
|
if (tdRSmaProcessExecImpl((SSma *)param, RSMA_EXEC_OVERFLOW) < 0) {
|
||||||
smaError("vgId:%d, failed to process rsma exec", SMA_VID((SSma *)param));
|
smaError("vgId:%d, failed to process rsma exec", SMA_VID((SSma *)param));
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -444,7 +450,9 @@ static int32_t tdRsmaStopExecutor(const SSma *pSma) {
|
||||||
pthread = (TdThread *)&pStat->data;
|
pthread = (TdThread *)&pStat->data;
|
||||||
|
|
||||||
for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
|
for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
|
||||||
(void)tsem_post(&(pRSmaStat->notEmpty));
|
if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
|
||||||
|
smaError("vgId:%d, failed to post notEmpty semaphore for rsma since %s", SMA_VID(pSma), tstrerror(terrno));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
|
for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
|
||||||
|
|
|
@ -1707,7 +1707,9 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)tsem_wait(&pRSmaStat->notEmpty);
|
if (tsem_wait(&pRSmaStat->notEmpty) != 0) {
|
||||||
|
smaError("vgId:%d, failed to wait for not empty since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
|
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
|
||||||
smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
|
smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
|
||||||
|
|
|
@ -1474,6 +1474,9 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
|
||||||
TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
|
TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema));
|
||||||
|
|
||||||
ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
|
ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
|
||||||
|
if (ctxArray == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(terrno, &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
// 1. prepare last
|
// 1. prepare last
|
||||||
STsdbRowKey tsdbRowKey = {0};
|
STsdbRowKey tsdbRowKey = {0};
|
||||||
|
|
|
@ -331,7 +331,9 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
||||||
TAOS_CHECK_GOTO(tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did), &lino,
|
TAOS_CHECK_GOTO(tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did), &lino,
|
||||||
_exit);
|
_exit);
|
||||||
|
|
||||||
TAOS_UNUSED(tfsMkdirRecurAt(committer->tsdb->pVnode->pTfs, committer->tsdb->path, committer->ctx->did));
|
if (tfsMkdirRecurAt(committer->tsdb->pVnode->pTfs, committer->tsdb->path, committer->ctx->did) != 0) {
|
||||||
|
tsdbError("vgId:%d failed to create directory %s", TD_VID(committer->tsdb->pVnode), committer->tsdb->path);
|
||||||
|
}
|
||||||
committer->ctx->tbid->suid = 0;
|
committer->ctx->tbid->suid = 0;
|
||||||
committer->ctx->tbid->uid = 0;
|
committer->ctx->tbid->uid = 0;
|
||||||
|
|
||||||
|
|
|
@ -1362,7 +1362,7 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
|
||||||
};
|
};
|
||||||
for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
|
for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
|
||||||
STombRecord record;
|
STombRecord record;
|
||||||
TAOS_UNUSED(tTombBlockGet(tombBlock, i, &record));
|
TAOS_CHECK_RETURN(tTombBlockGet(tombBlock, i, &record));
|
||||||
|
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
tombBlk.minTbid.suid = record.suid;
|
tombBlk.minTbid.suid = record.suid;
|
||||||
|
@ -1519,7 +1519,7 @@ static int32_t tsdbDataFileDoWriteTombRecord(SDataFileWriter *writer, const STom
|
||||||
while (writer->ctx->hasOldTomb) {
|
while (writer->ctx->hasOldTomb) {
|
||||||
for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) {
|
for (; writer->ctx->tombBlockIdx < TOMB_BLOCK_SIZE(writer->ctx->tombBlock); writer->ctx->tombBlockIdx++) {
|
||||||
STombRecord record1[1];
|
STombRecord record1[1];
|
||||||
TAOS_UNUSED(tTombBlockGet(writer->ctx->tombBlock, writer->ctx->tombBlockIdx, record1));
|
TAOS_CHECK_GOTO(tTombBlockGet(writer->ctx->tombBlock, writer->ctx->tombBlockIdx, record1), &lino, _exit);
|
||||||
|
|
||||||
int32_t c = tTombRecordCompare(record, record1);
|
int32_t c = tTombRecordCompare(record, record1);
|
||||||
if (c < 0) {
|
if (c < 0) {
|
||||||
|
|
|
@ -225,7 +225,7 @@ static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) {
|
||||||
|
|
||||||
iter->row->row = row[0];
|
iter->row->row = row[0];
|
||||||
|
|
||||||
TAOS_UNUSED(tsdbTbDataIterNext(iter->memtData->tbIter));
|
bool r = tsdbTbDataIterNext(iter->memtData->tbIter);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -193,7 +193,11 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
||||||
pMemTable->minVer = TMIN(pMemTable->minVer, version);
|
pMemTable->minVer = TMIN(pMemTable->minVer, version);
|
||||||
pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
|
pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
|
||||||
|
|
||||||
TAOS_UNUSED(tsdbCacheDel(pTsdb, suid, uid, sKey, eKey));
|
if (tsdbCacheDel(pTsdb, suid, uid, sKey, eKey) != 0) {
|
||||||
|
tsdbError("vgId:%d, failed to delete cache data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64
|
||||||
|
" eKey:%" PRId64 " at version %" PRId64,
|
||||||
|
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||||
" at version %" PRId64,
|
" at version %" PRId64,
|
||||||
|
@ -652,7 +656,10 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
||||||
TAOS_UNUSED(tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData));
|
if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
|
||||||
|
tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
|
||||||
|
TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SMemTable
|
// SMemTable
|
||||||
|
|
|
@ -403,7 +403,8 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
||||||
pBlockLoadInfo->cost.loadStatisBlocks += num;
|
pBlockLoadInfo->cost.loadStatisBlocks += num;
|
||||||
|
|
||||||
STbStatisBlock block;
|
STbStatisBlock block;
|
||||||
TAOS_UNUSED(tStatisBlockInit(&block));
|
code = tStatisBlockInit(&block);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
|
|
@ -691,7 +691,7 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
|
||||||
if (/*lcn < 1 && */ taosCheckExistFile(fobj->fname)) {
|
if (/*lcn < 1 && */ taosCheckExistFile(fobj->fname)) {
|
||||||
int32_t mtime = 0;
|
int32_t mtime = 0;
|
||||||
int64_t size = 0;
|
int64_t size = 0;
|
||||||
(void)taosStatFile(fobj->fname, &size, &mtime, NULL);
|
int32_t r = taosStatFile(fobj->fname, &size, &mtime, NULL);
|
||||||
if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
|
if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
|
||||||
if (pCfg->s3Compact && lcn < 0) {
|
if (pCfg->s3Compact && lcn < 0) {
|
||||||
extern int32_t tsdbAsyncCompact(STsdb * tsdb, const STimeWindow *tw, bool sync);
|
extern int32_t tsdbAsyncCompact(STsdb * tsdb, const STimeWindow *tw, bool sync);
|
||||||
|
|
|
@ -821,7 +821,9 @@ static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
|
||||||
code = TSDB_CODE_NO_AVAIL_DISK;
|
code = TSDB_CODE_NO_AVAIL_DISK;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
TAOS_UNUSED(tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did));
|
if (tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did) != 0) {
|
||||||
|
tsdbError("vgId:%d failed to create directory %s", TD_VID(writer->tsdb->pVnode), writer->tsdb->path);
|
||||||
|
}
|
||||||
|
|
||||||
writer->ctx->hasData = true;
|
writer->ctx->hasData = true;
|
||||||
writer->ctx->hasTomb = true;
|
writer->ctx->hasTomb = true;
|
||||||
|
|
|
@ -106,7 +106,7 @@ _exit:
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
|
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
|
||||||
TAOS_UNUSED(tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem));
|
int32_t r = tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
#ifdef BUILD_NO_CALL
|
||||||
|
|
|
@ -544,7 +544,9 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
vnodeCloseBufPool(pVnode);
|
vnodeCloseBufPool(pVnode);
|
||||||
|
|
||||||
// destroy handle
|
// destroy handle
|
||||||
(void)tsem_destroy(&pVnode->syncSem);
|
if (tsem_destroy(&pVnode->syncSem) != 0) {
|
||||||
|
vError("vgId:%d, failed to destroy semaphore", TD_VID(pVnode));
|
||||||
|
}
|
||||||
(void)taosThreadCondDestroy(&pVnode->poolNotEmpty);
|
(void)taosThreadCondDestroy(&pVnode->poolNotEmpty);
|
||||||
(void)taosThreadMutexDestroy(&pVnode->mutex);
|
(void)taosThreadMutexDestroy(&pVnode->mutex);
|
||||||
(void)taosThreadMutexDestroy(&pVnode->lock);
|
(void)taosThreadMutexDestroy(&pVnode->lock);
|
||||||
|
|
|
@ -210,7 +210,9 @@ static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq);
|
if (tSerializeSVDropTtlTableReq((char *)pContNew + sizeof(SMsgHead), reqLenNew, &ttlReq) != 0) {
|
||||||
|
vError("vgId:%d %s:%d failed to serialize drop ttl request", TD_VID(pVnode), __func__, lino);
|
||||||
|
}
|
||||||
pContNew->contLen = htonl(reqLenNew);
|
pContNew->contLen = htonl(reqLenNew);
|
||||||
pContNew->vgId = pContOld->vgId;
|
pContNew->vgId = pContOld->vgId;
|
||||||
|
|
||||||
|
@ -420,7 +422,9 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
|
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
|
||||||
|
|
||||||
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
|
tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size);
|
||||||
(void)tEncodeDeleteRes(pCoder, &res);
|
if (tEncodeDeleteRes(pCoder, &res) != 0) {
|
||||||
|
vError("vgId:%d %s failed to encode delete response", TD_VID(pVnode), __func__);
|
||||||
|
}
|
||||||
tEncoderClear(pCoder);
|
tEncoderClear(pCoder);
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
@ -647,7 +651,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_CONSEN_CHKPT: {
|
case TDMT_STREAM_CONSEN_CHKPT: {
|
||||||
if (pVnode->restored) {
|
if (pVnode->restored) {
|
||||||
(void)tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg);
|
if (tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_PAUSE: {
|
case TDMT_STREAM_TASK_PAUSE: {
|
||||||
|
@ -664,7 +670,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_STREAM_TASK_RESET: {
|
case TDMT_VND_STREAM_TASK_RESET: {
|
||||||
if (pVnode->restored && vnodeIsLeader(pVnode)) {
|
if (pVnode->restored && vnodeIsLeader(pVnode)) {
|
||||||
(void)tqProcessTaskResetReq(pVnode->pTq, pMsg);
|
if (tqProcessTaskResetReq(pVnode->pTq, pMsg) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
|
@ -1215,7 +1223,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
(void)tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
|
if (tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB) < 0) {
|
||||||
|
vError("vgId:%d, failed to get name from string", TD_VID(pVnode));
|
||||||
|
}
|
||||||
|
|
||||||
SStringBuilder sb = {0};
|
SStringBuilder sb = {0};
|
||||||
for (int32_t i = 0; i < tbNames->size; i++) {
|
for (int32_t i = 0; i < tbNames->size; i++) {
|
||||||
|
@ -1942,7 +1952,9 @@ _exit:
|
||||||
tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
|
tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
|
||||||
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
pRsp->pCont = rpcMallocCont(pRsp->contLen);
|
||||||
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
||||||
(void)tEncodeSSubmitRsp2(&ec, pSubmitRsp);
|
if (tEncodeSSubmitRsp2(&ec, pSubmitRsp) < 0) {
|
||||||
|
vError("vgId:%d, failed to encode submit response", TD_VID(pVnode));
|
||||||
|
}
|
||||||
tEncoderClear(&ec);
|
tEncoderClear(&ec);
|
||||||
|
|
||||||
// update statistics
|
// update statistics
|
||||||
|
|
|
@ -204,19 +204,23 @@ static EDealRes walkExprs(SNodeList* pNodeList, ETraversalOrder order, FNodeWalk
|
||||||
}
|
}
|
||||||
|
|
||||||
void nodesWalkExpr(SNode* pNode, FNodeWalker walker, void* pContext) {
|
void nodesWalkExpr(SNode* pNode, FNodeWalker walker, void* pContext) {
|
||||||
(void)walkExpr(pNode, TRAVERSAL_PREORDER, walker, pContext);
|
EDealRes res;
|
||||||
|
res = walkExpr(pNode, TRAVERSAL_PREORDER, walker, pContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nodesWalkExprs(SNodeList* pNodeList, FNodeWalker walker, void* pContext) {
|
void nodesWalkExprs(SNodeList* pNodeList, FNodeWalker walker, void* pContext) {
|
||||||
(void)walkExprs(pNodeList, TRAVERSAL_PREORDER, walker, pContext);
|
EDealRes res;
|
||||||
|
res = walkExprs(pNodeList, TRAVERSAL_PREORDER, walker, pContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nodesWalkExprPostOrder(SNode* pNode, FNodeWalker walker, void* pContext) {
|
void nodesWalkExprPostOrder(SNode* pNode, FNodeWalker walker, void* pContext) {
|
||||||
(void)walkExpr(pNode, TRAVERSAL_POSTORDER, walker, pContext);
|
EDealRes res;
|
||||||
|
res = walkExpr(pNode, TRAVERSAL_POSTORDER, walker, pContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nodesWalkExprsPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext) {
|
void nodesWalkExprsPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext) {
|
||||||
(void)walkExprs(pList, TRAVERSAL_POSTORDER, walker, pContext);
|
EDealRes res;
|
||||||
|
res = walkExprs(pList, TRAVERSAL_POSTORDER, walker, pContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void checkParamIsFunc(SFunctionNode* pFunc) {
|
static void checkParamIsFunc(SFunctionNode* pFunc) {
|
||||||
|
|
|
@ -297,7 +297,7 @@ void taosArrayRemove(SArray* pArray, size_t index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index == pArray->size - 1) {
|
if (index == pArray->size - 1) {
|
||||||
(void)taosArrayPop(pArray);
|
void* t = taosArrayPop(pArray);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -911,7 +911,9 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
||||||
|
|
||||||
strncpy(line, *pEnv, sizeof(line) - 1);
|
strncpy(line, *pEnv, sizeof(line) - 1);
|
||||||
pEnv++;
|
pEnv++;
|
||||||
(void)taosEnvToCfg(line, line);
|
if (taosEnvToCfg(line, line) < 0) {
|
||||||
|
uError("failed to convert env to cfg:%s", line);
|
||||||
|
}
|
||||||
|
|
||||||
(void)paGetToken(line, &name, &olen);
|
(void)paGetToken(line, &name, &olen);
|
||||||
if (olen == 0) continue;
|
if (olen == 0) continue;
|
||||||
|
@ -954,7 +956,9 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
|
||||||
while (envCmd[index] != NULL) {
|
while (envCmd[index] != NULL) {
|
||||||
strncpy(buf, envCmd[index], sizeof(buf) - 1);
|
strncpy(buf, envCmd[index], sizeof(buf) - 1);
|
||||||
buf[sizeof(buf) - 1] = 0;
|
buf[sizeof(buf) - 1] = 0;
|
||||||
(void)taosEnvToCfg(buf, buf);
|
if (taosEnvToCfg(buf, buf) < 0) {
|
||||||
|
uError("failed to convert env to cfg:%s", buf);
|
||||||
|
}
|
||||||
index++;
|
index++;
|
||||||
|
|
||||||
name = value = value2 = value3 = value4 = NULL;
|
name = value = value2 = value3 = value4 = NULL;
|
||||||
|
@ -1026,7 +1030,9 @@ int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (line[_bytes - 1] == '\n') line[_bytes - 1] = 0;
|
if (line[_bytes - 1] == '\n') line[_bytes - 1] = 0;
|
||||||
(void)taosEnvToCfg(line, line);
|
if (taosEnvToCfg(line, line) < 0) {
|
||||||
|
uError("failed to convert env to cfg:%s", line);
|
||||||
|
}
|
||||||
|
|
||||||
(void)paGetToken(line, &name, &olen);
|
(void)paGetToken(line, &name, &olen);
|
||||||
if (olen == 0) continue;
|
if (olen == 0) continue;
|
||||||
|
@ -1273,7 +1279,12 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
|
||||||
}
|
}
|
||||||
|
|
||||||
buf[fileSize] = 0;
|
buf[fileSize] = 0;
|
||||||
(void)taosLSeekFile(pFile, 0, SEEK_SET);
|
if (taosLSeekFile(pFile, 0, SEEK_SET) < 0) {
|
||||||
|
(void)taosCloseFile(&pFile);
|
||||||
|
(void)printf("load json file error: %s\n", filepath);
|
||||||
|
taosMemoryFreeClear(buf);
|
||||||
|
TAOS_RETURN(terrno);
|
||||||
|
}
|
||||||
if (taosReadFile(pFile, buf, fileSize) <= 0) {
|
if (taosReadFile(pFile, buf, fileSize) <= 0) {
|
||||||
(void)taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
(void)printf("load json file error: %s\n", filepath);
|
(void)printf("load json file error: %s\n", filepath);
|
||||||
|
|
|
@ -252,7 +252,9 @@ double tdigestQuantile(TDigest *t, double q) {
|
||||||
int64_t weight_so_far;
|
int64_t weight_so_far;
|
||||||
SCentroid *a, *b, tmp;
|
SCentroid *a, *b, tmp;
|
||||||
|
|
||||||
(void)tdigestCompress(t);
|
if (tdigestCompress(t) != 0) {
|
||||||
|
uError("failed to compress t-digest");
|
||||||
|
}
|
||||||
if (t->num_centroids == 0) return NAN;
|
if (t->num_centroids == 0) return NAN;
|
||||||
if (t->num_centroids == 1) return t->centroids[0].mean;
|
if (t->num_centroids == 1) return t->centroids[0].mean;
|
||||||
if (FLOAT_EQ(q, 0.0)) return t->min;
|
if (FLOAT_EQ(q, 0.0)) return t->min;
|
||||||
|
|
|
@ -838,7 +838,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
|
||||||
taosHashRLock(pHashObj);
|
taosHashRLock(pHashObj);
|
||||||
|
|
||||||
int slot;
|
int slot;
|
||||||
(void)taosHashReleaseNode(pHashObj, p, &slot);
|
void *tp = taosHashReleaseNode(pHashObj, p, &slot);
|
||||||
|
|
||||||
SHashEntry *pe = pHashObj->hashList[slot];
|
SHashEntry *pe = pHashObj->hashList[slot];
|
||||||
|
|
||||||
|
|
|
@ -178,7 +178,6 @@ void *taosProcessSchedQueue(void *scheduler) {
|
||||||
(*(msg.tfp))(msg.ahandle, msg.thandle);
|
(*(msg.tfp))(msg.ahandle, msg.thandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,7 +229,9 @@ void taosCleanUpScheduler(void *param) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
|
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
|
||||||
if (taosCheckPthreadValid(pSched->qthread[i])) {
|
if (taosCheckPthreadValid(pSched->qthread[i])) {
|
||||||
(void)tsem_post(&pSched->fullSem);
|
if (tsem_post(&pSched->fullSem) != 0) {
|
||||||
|
uError("post %s fullSem failed(%s)", pSched->label, strerror(terrno));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
|
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
|
||||||
|
@ -240,12 +241,17 @@ void taosCleanUpScheduler(void *param) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)tsem_destroy(&pSched->emptySem);
|
if (tsem_destroy(&pSched->emptySem) != 0) {
|
||||||
(void)tsem_destroy(&pSched->fullSem);
|
uError("failed to destroy %s emptySem", pSched->label);
|
||||||
|
}
|
||||||
|
if (tsem_destroy(&pSched->fullSem) != 0) {
|
||||||
|
uError("failed to destroy %s fullSem", pSched->label);
|
||||||
|
}
|
||||||
(void)taosThreadMutexDestroy(&pSched->queueMutex);
|
(void)taosThreadMutexDestroy(&pSched->queueMutex);
|
||||||
|
|
||||||
if (pSched->pTimer) {
|
if (pSched->pTimer) {
|
||||||
(void)taosTmrStop(pSched->pTimer);
|
bool r = taosTmrStop(pSched->pTimer);
|
||||||
|
uTrace("stop timer:%p, result:%d", pSched->pTimer, r);
|
||||||
pSched->pTimer = NULL;
|
pSched->pTimer = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -314,7 +314,9 @@ static void addToExpired(tmr_obj_t* head) {
|
||||||
schedMsg.msg = NULL;
|
schedMsg.msg = NULL;
|
||||||
schedMsg.ahandle = head;
|
schedMsg.ahandle = head;
|
||||||
schedMsg.thandle = NULL;
|
schedMsg.thandle = NULL;
|
||||||
(void)taosScheduleTask(tmrQhandle, &schedMsg);
|
if (taosScheduleTask(tmrQhandle, &schedMsg) != 0) {
|
||||||
|
tmrError("%s failed to add expired timer[id=%" PRIuPTR "] to queue.", head->ctrl->label, id);
|
||||||
|
}
|
||||||
|
|
||||||
tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
|
tmrDebug("timer[id=%" PRIuPTR "] has been added to queue.", id);
|
||||||
head = next;
|
head = next;
|
||||||
|
@ -560,7 +562,9 @@ static int32_t taosTmrModuleInit(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
|
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr", NULL);
|
||||||
(void)taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
|
if (taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK) != 0) {
|
||||||
|
tmrError("failed to initialize timer");
|
||||||
|
}
|
||||||
|
|
||||||
tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
|
tmrDebug("timer module is initialized, number of threads: %d", taosTmrThreads);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue