fix: remove void function call
This commit is contained in:
parent
3d579674b8
commit
a7269e4536
|
@ -79,7 +79,7 @@ void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
|
|||
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item);
|
||||
void taosFreeQitem(void *pItem);
|
||||
int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
|
||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||
void taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||
bool taosQueueEmpty(STaosQueue *queue);
|
||||
void taosUpdateItemSize(STaosQueue *queue, int32_t items);
|
||||
int32_t taosQueueItemSize(STaosQueue *queue);
|
||||
|
|
|
@ -730,7 +730,7 @@ static void* monitorThreadFunc(void* param) {
|
|||
}
|
||||
|
||||
MonitorSlowLogData* slowLogData = NULL;
|
||||
(void)taosReadQitem(monitorQueue, (void**)&slowLogData);
|
||||
taosReadQitem(monitorQueue, (void**)&slowLogData);
|
||||
if (slowLogData != NULL) {
|
||||
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG && quitCnt == 0) {
|
||||
if (slowLogData->pFile != NULL) {
|
||||
|
|
|
@ -706,7 +706,9 @@ void ctgProcessTimerEvent(void *param, void *tmrId) {
|
|||
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
|
||||
if (code) {
|
||||
qError("clear cache enqueue failed, error:%s", tstrerror(code));
|
||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||
}
|
||||
}
|
||||
|
||||
goto _return;
|
||||
|
@ -714,7 +716,9 @@ void ctgProcessTimerEvent(void *param, void *tmrId) {
|
|||
}
|
||||
|
||||
qTrace("reset catalog timer");
|
||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
|
@ -1517,10 +1521,16 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SC
|
|||
_return:
|
||||
|
||||
if (pJob) {
|
||||
(void)taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);
|
||||
int32_t code2 = taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);
|
||||
if (TSDB_CODE_SUCCESS) {
|
||||
qError("release catalog job refId %" PRId64 "falied, error:%s", pJob->refId, tstrerror(code2));
|
||||
}
|
||||
|
||||
if (code) {
|
||||
(void)taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
||||
code2 = taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
||||
if (TSDB_CODE_SUCCESS) {
|
||||
qError("remove catalog job refId %" PRId64 "falied, error:%s", pJob->refId, tstrerror(code2));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1967,7 +1977,9 @@ void catalogDestroy(void) {
|
|||
}
|
||||
|
||||
if (gCtgMgmt.cacheTimer) {
|
||||
(void)taosTmrStop(gCtgMgmt.cacheTimer);
|
||||
if (taosTmrStop(gCtgMgmt.cacheTimer)) {
|
||||
qTrace("stop catalog cache timer may failed");
|
||||
}
|
||||
gCtgMgmt.cacheTimer = NULL;
|
||||
taosTmrCleanUp(gCtgMgmt.timer);
|
||||
gCtgMgmt.timer = NULL;
|
||||
|
|
|
@ -1007,7 +1007,11 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
|||
CTG_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
(void)taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
||||
void* p = taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
||||
if (NULL == p) {
|
||||
ctgError("acquire job from ref failed, refId:%" PRId64 ", error: %s", pJob->refId, tstrerror(terrno));
|
||||
CTG_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
qDebug("qid:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms",
|
||||
|
@ -1406,7 +1410,11 @@ int32_t ctgCallUserCb(void* param) {
|
|||
|
||||
qDebug("qid:0x%" PRIx64 " ctg end to call user cb", pJob->queryId);
|
||||
|
||||
(void)taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
||||
int64_t refId = pJob->refId;
|
||||
int32_t code = taosRemoveRef(gCtgMgmt.jobPool, refId);
|
||||
if (code) {
|
||||
qError("qid:0x%" PRIx64 " remove ctg job %" PRId64 " from jobPool failed, error:%s", pJob->queryId, refId, tstrerror(code));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -2915,21 +2915,27 @@ void ctgClearMetaCache(SCtgCacheOperation *operation) {
|
|||
|
||||
if (CTG_CACHE_LOW(remainSize, cacheMaxSize)) {
|
||||
qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize);
|
||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!roundDone) {
|
||||
qDebug("catalog all meta cleared, remainSize:%" PRId64 ", cacheMaxSize:%dMB, to clear handle", remainSize, cacheMaxSize);
|
||||
ctgClearFreeCache(operation);
|
||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
|
||||
if (code) {
|
||||
qError("clear cache enqueue failed, error:%s", tstrerror(code));
|
||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
||||
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2993,7 +2999,10 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) {
|
|||
pCtgCache->pTsmas = NULL;
|
||||
|
||||
ctgDebug("all tsmas for table dropped: %s.%s", msg->dbFName, msg->tbName);
|
||||
(void)taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN);
|
||||
code = taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
ctgError("remove table %s.%s from tsmaCache failed, error:%s", msg->dbFName, msg->tbName, tstrerror(code));
|
||||
}
|
||||
|
||||
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
|
||||
} else {
|
||||
|
@ -3191,6 +3200,7 @@ void ctgCleanupCacheQueue(void) {
|
|||
SCtgQNode *nodeNext = NULL;
|
||||
SCtgCacheOperation *op = NULL;
|
||||
bool stopQueue = false;
|
||||
int32_t code = 0;
|
||||
|
||||
while (true) {
|
||||
node = gCtgMgmt.queue.head->next;
|
||||
|
@ -3209,7 +3219,10 @@ void ctgCleanupCacheQueue(void) {
|
|||
}
|
||||
|
||||
if (op->syncOp) {
|
||||
(void)tsem_post(&op->rspSem);
|
||||
code = tsem_post(&op->rspSem);
|
||||
if (code) {
|
||||
qError("tsem_post failed when cleanup cache queue, error:%s", tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
taosMemoryFree(op);
|
||||
}
|
||||
|
@ -3234,12 +3247,13 @@ void ctgCleanupCacheQueue(void) {
|
|||
|
||||
void *ctgUpdateThreadFunc(void *param) {
|
||||
setThreadName("catalog");
|
||||
int32_t code = 0;
|
||||
|
||||
qInfo("catalog update thread started");
|
||||
|
||||
while (true) {
|
||||
if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
|
||||
qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
qError("ctg tsem_wait failed, error:%s", tstrerror(terrno));
|
||||
}
|
||||
|
||||
if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
|
||||
|
@ -3256,7 +3270,10 @@ void *ctgUpdateThreadFunc(void *param) {
|
|||
(void)(*gCtgCacheOperation[operation->opId].func)(operation); // ignore any error
|
||||
|
||||
if (operation->syncOp) {
|
||||
(void)tsem_post(&operation->rspSem);
|
||||
code = tsem_post(&operation->rspSem);
|
||||
if (code) {
|
||||
ctgError("tsem_post failed for syncOp update, error:%s", tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
taosMemoryFreeClear(operation);
|
||||
}
|
||||
|
|
|
@ -470,7 +470,10 @@ _return:
|
|||
taosMemoryFree(pMsg->pEpSet);
|
||||
|
||||
if (pJob) {
|
||||
(void)taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
|
||||
int32_t code2 = taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
|
||||
if (code2) {
|
||||
qError("release ctg job refId:%" PRId64 " failed, error:%s", cbParam->refId, tstrerror(code2));
|
||||
}
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
|
@ -1087,11 +1090,16 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
|
|||
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
|
||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
(void)tNameExtractFullName(name, tbFName);
|
||||
|
||||
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
|
||||
int32_t code = tNameExtractFullName(name, tbFName);
|
||||
if (code) {
|
||||
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -1403,17 +1411,23 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
|
|||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_VND_TABLE_CFG;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
(void)tNameExtractFullName(pTableName, tbFName);
|
||||
|
||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
(void)tNameGetFullDbName(pTableName, dbFName);
|
||||
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
|
||||
|
||||
int32_t code = tNameExtractFullName(pTableName, tbFName);
|
||||
if (code) {
|
||||
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
|
||||
ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
|
||||
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||
code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -1471,15 +1485,20 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
|
|||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_TABLE_CFG;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
(void)tNameExtractFullName(pTableName, tbFName);
|
||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
(void)tNameGetFullDbName(pTableName, dbFName);
|
||||
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
|
||||
|
||||
int32_t code = tNameExtractFullName(pTableName, tbFName);
|
||||
if (code) {
|
||||
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||
code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -1583,11 +1602,15 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName*
|
|||
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
(void)tNameExtractFullName(pName, fullName);
|
||||
int32_t code = tNameExtractFullName(pName, fullName);
|
||||
if (code) {
|
||||
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pName->type, pName->dbname, pName->tname);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
ctgDebug("try to get view info from mnode, viewFName:%s", fullName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](fullName, &msg, 0, &msgLen, mallocFp);
|
||||
code = queryBuildMsg[TMSG_INDEX(reqType)](fullName, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build view-meta msg failed, code:%x, viewFName:%s", code, fullName);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -1640,11 +1663,15 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
|
|||
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
(void)tNameExtractFullName(name, tbFName);
|
||||
int32_t code = tNameExtractFullName(name, tbFName);
|
||||
if (code) {
|
||||
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
|
||||
code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -1697,7 +1724,12 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c
|
|||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_VND_GET_STREAM_PROGRESS;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
(void)tNameExtractFullName(pTbName, tbFName);
|
||||
int32_t code = tNameExtractFullName(pTbName, tbFName);
|
||||
if (code) {
|
||||
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTbName->type, pTbName->dbname, pTbName->tname);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||
|
||||
|
@ -1705,7 +1737,7 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c
|
|||
ctgDebug("try to get stream progress from vnode, vgId:%d, ep num:%d, ep %s:%d, target:%s", vgroupInfo->vgId,
|
||||
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](bInput, &msg, 0, &msgLen, mallocFp);
|
||||
code = queryBuildMsg[TMSG_INDEX(reqType)](bInput, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get stream progress failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||
CTG_ERR_RET(code);
|
||||
|
|
|
@ -433,6 +433,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
|||
|
||||
void ctgClearHandleMeta(SCatalog* pCtg, int64_t* pClearedSize, int64_t* pCleardNum, bool* roundDone) {
|
||||
int64_t cacheSize = 0;
|
||||
int32_t code = 0;
|
||||
void* pIter = taosHashIterate(pCtg->dbCache, NULL);
|
||||
while (pIter) {
|
||||
SCtgDBCache* dbCache = pIter;
|
||||
|
@ -447,7 +448,10 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t* pClearedSize, int64_t* pCleardN
|
|||
continue;
|
||||
}
|
||||
|
||||
(void)taosHashRemove(dbCache->tbCache, key, len);
|
||||
code = taosHashRemove(dbCache->tbCache, key, len);
|
||||
if (code) {
|
||||
qError("taosHashRemove table cache failed, key:%s, len:%d, error:%s", (char*)key, (int32_t)len, tstrerror(code));
|
||||
}
|
||||
|
||||
cacheSize =
|
||||
len + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(pCache->pMeta) + ctgGetTbIndexCacheSize(pCache->pIndex);
|
||||
|
@ -1201,7 +1205,11 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* d
|
|||
|
||||
SVgroupInfo* vgInfo = NULL;
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
(void)tNameExtractFullName(pTableName, tbFullName);
|
||||
code = tNameExtractFullName(pTableName, tbFullName);
|
||||
if (code) {
|
||||
ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
||||
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
||||
|
@ -1965,7 +1973,12 @@ int32_t ctgChkSetTbAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
|
|||
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
(void)tNameExtractFullName(&req->pRawReq->tbName, tbFName);
|
||||
code = tNameExtractFullName(&req->pRawReq->tbName, tbFName);
|
||||
if (code) {
|
||||
ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
(void)tNameGetFullDbName(&req->pRawReq->tbName, dbFName);
|
||||
|
||||
while (true) {
|
||||
|
@ -2151,7 +2164,11 @@ int32_t ctgChkSetViewAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res)
|
|||
if (IS_SYS_DBNAME(req->pRawReq->tbName.dbname)) {
|
||||
(void)snprintf(viewFName, sizeof(viewFName), "%s.%s", req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
|
||||
} else {
|
||||
(void)tNameExtractFullName(&req->pRawReq->tbName, viewFName);
|
||||
code = tNameExtractFullName(&req->pRawReq->tbName, viewFName);
|
||||
if (code) {
|
||||
ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
}
|
||||
int32_t len = strlen(viewFName) + 1;
|
||||
|
||||
|
|
|
@ -190,7 +190,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRaw
|
|||
}
|
||||
|
||||
SDataDeleterBuf* pBuf = NULL;
|
||||
(void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||
if (pBuf != NULL) {
|
||||
TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
|
||||
taosFreeQitem(pBuf);
|
||||
|
@ -248,7 +248,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
|||
taosMemoryFree(pDeleter->pParam);
|
||||
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
|
||||
SDataDeleterBuf* pBuf = NULL;
|
||||
(void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||
|
||||
if (pBuf != NULL) {
|
||||
taosMemoryFreeClear(pBuf->pData);
|
||||
|
|
|
@ -233,7 +233,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRow
|
|||
}
|
||||
|
||||
SDataDispatchBuf* pBuf = NULL;
|
||||
(void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||
if (pBuf != NULL) {
|
||||
TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
||||
taosFreeQitem(pBuf);
|
||||
|
@ -291,7 +291,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
|||
|
||||
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
||||
SDataDispatchBuf* pBuf = NULL;
|
||||
(void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||
if (pBuf != NULL) {
|
||||
taosMemoryFreeClear(pBuf->pData);
|
||||
taosFreeQitem(pBuf);
|
||||
|
|
|
@ -57,6 +57,7 @@ typedef struct SSubmitRspParam {
|
|||
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
SSubmitRspParam* pParam = (SSubmitRspParam*)param;
|
||||
SDataInserterHandle* pInserter = pParam->pInserter;
|
||||
int32_t code2 = 0;
|
||||
|
||||
if (code) {
|
||||
pInserter->submitRes.code = code;
|
||||
|
@ -106,7 +107,14 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
_return:
|
||||
|
||||
(void)tsem_post(&pInserter->ready);
|
||||
code2 = tsem_post(&pInserter->ready);
|
||||
if (code2 < 0) {
|
||||
qError("tsem_post inserter ready failed, error:%s", tstrerror(code2));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pInserter->submitRes.code = code2;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -561,6 +561,7 @@ static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPost
|
|||
|
||||
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) {
|
||||
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
|
||||
int32_t code = 0;
|
||||
|
||||
pPost->isStarted = false;
|
||||
|
||||
|
@ -571,7 +572,11 @@ static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDy
|
|||
if (pPost->leftNeedCache) {
|
||||
uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
|
||||
if (num && --(*num) <= 0) {
|
||||
(void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
|
||||
code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
|
||||
if (code) {
|
||||
qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
|
||||
QRY_ERR_RET(code);
|
||||
}
|
||||
QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
|
||||
}
|
||||
}
|
||||
|
@ -579,7 +584,11 @@ static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDy
|
|||
if (!pPost->rightNeedCache) {
|
||||
void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
|
||||
if (NULL != v) {
|
||||
(void)tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
|
||||
code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
|
||||
if (code) {
|
||||
qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
|
||||
QRY_ERR_RET(code);
|
||||
}
|
||||
QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
|
||||
}
|
||||
}
|
||||
|
@ -660,7 +669,11 @@ static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnc
|
|||
break;
|
||||
default:
|
||||
if (1 == (*pNum)) {
|
||||
(void)tSimpleHashRemove(pOnceHash, pKey, keySize);
|
||||
code = tSimpleHashRemove(pOnceHash, pKey, keySize);
|
||||
if (code) {
|
||||
qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
|
||||
QRY_ERR_RET(code);
|
||||
}
|
||||
}
|
||||
(*pNum)++;
|
||||
break;
|
||||
|
@ -811,8 +824,12 @@ static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
|
|||
|
||||
uint64_t* pUid = NULL;
|
||||
int32_t iter = 0;
|
||||
int32_t code = 0;
|
||||
while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
|
||||
(void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
|
||||
code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
|
||||
if (code) {
|
||||
qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
|
||||
|
|
|
@ -30,7 +30,9 @@
|
|||
|
||||
static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) {
|
||||
if (pFileInfo->fd.fd) {
|
||||
(void)taosCloseFile(&pFileInfo->fd.fd);
|
||||
if (taosCloseFile(&pFileInfo->fd.fd) < 0) {
|
||||
qError("close group cache file failed, fd:%p, error:%s", pFileInfo->fd.fd, tstrerror(terrno));
|
||||
}
|
||||
pFileInfo->fd.fd = NULL;
|
||||
(void)taosThreadMutexDestroy(&pFileInfo->fd.mutex);
|
||||
}
|
||||
|
@ -92,7 +94,9 @@ static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) {
|
|||
static void freeSGcSessionCtx(void* p) {
|
||||
SGcSessionCtx* pSession = p;
|
||||
if (pSession->semInit) {
|
||||
(void)tsem_destroy(&pSession->waitSem);
|
||||
if (tsem_destroy(&pSession->waitSem) < 0) {
|
||||
qError("tsem_destroy session waitSem failed, error:%s", tstrerror(terrno));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -262,6 +266,9 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S
|
|||
}
|
||||
|
||||
static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) {
|
||||
if (NULL == pFd) {
|
||||
return;
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&pFd->mutex);
|
||||
}
|
||||
|
||||
|
@ -274,6 +281,8 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
|
|||
SGroupCacheData* pGroup = NULL;
|
||||
|
||||
while (NULL != pHead) {
|
||||
pFd = NULL;
|
||||
|
||||
if (pGCache->batchFetch) {
|
||||
pFileCtx = &pHead->pCtx->fileCtx;
|
||||
} else {
|
||||
|
@ -290,7 +299,11 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
|
|||
|
||||
int64_t blkId = pHead->basic.blkId;
|
||||
pHead = pHead->next;
|
||||
(void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
||||
code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
||||
if (code) {
|
||||
qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
|
||||
goto _return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -304,13 +317,19 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
|
|||
}
|
||||
|
||||
if (deleted) {
|
||||
releaseFdToFileCtx(pFd);
|
||||
|
||||
qTrace("FileId:%d-%d-%d already be deleted, skip write",
|
||||
pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId);
|
||||
|
||||
int64_t blkId = pHead->basic.blkId;
|
||||
pHead = pHead->next;
|
||||
|
||||
(void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
||||
code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
||||
if (code) {
|
||||
qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
|
||||
goto _return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -336,7 +355,11 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
|
|||
int64_t blkId = pHead->basic.blkId;
|
||||
pHead = pHead->next;
|
||||
|
||||
(void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
||||
code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
||||
if (code) {
|
||||
qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
|
||||
goto _return;
|
||||
}
|
||||
}
|
||||
|
||||
_return:
|
||||
|
@ -1036,7 +1059,11 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator
|
|||
}
|
||||
SGcSessionCtx* pWaitCtx = *ppWaitCtx;
|
||||
pWaitCtx->newFetch = true;
|
||||
(void)taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
|
||||
code = taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
|
||||
if (code) {
|
||||
qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", pSessionId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
QRY_ERR_RET(tsem_post(&pWaitCtx->waitSem));
|
||||
|
||||
return code;
|
||||
|
@ -1125,14 +1152,22 @@ static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstr
|
|||
|
||||
QRY_ERR_JRET(taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES));
|
||||
|
||||
(void)tsem_wait(&pSession->waitSem);
|
||||
code = tsem_wait(&pSession->waitSem);
|
||||
if (code) {
|
||||
qError("tsem_wait failed, error:%s", tstrerror(code));
|
||||
QRY_ERR_JRET(code);
|
||||
}
|
||||
|
||||
if (pSession->newFetch) {
|
||||
pSession->newFetch = false;
|
||||
return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
|
||||
}
|
||||
|
||||
(void)taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
|
||||
code = taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
|
||||
if (code) {
|
||||
qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", sessionId, tstrerror(code));
|
||||
QRY_ERR_JRET(code);
|
||||
}
|
||||
|
||||
bool got = false;
|
||||
return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
|
||||
|
@ -1278,14 +1313,22 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock
|
|||
SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
||||
if (ppBlock) {
|
||||
QRY_ERR_RET(releaseBaseBlockToList(pCtx, *ppBlock));
|
||||
(void)taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
||||
code = taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
||||
if (code) {
|
||||
qError("taosHashRemove session %" PRId64 " from pReadBlk failed, error: %s", pGcParam->sessionId, tstrerror(code));
|
||||
QRY_ERR_RET(code);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes);
|
||||
if (NULL == *ppRes) {
|
||||
qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows);
|
||||
(void)taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
||||
code = taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
||||
if (code) {
|
||||
qError("taosHashRemove session %" PRId64 " from pSessions failed, error: %s", pGcParam->sessionId, tstrerror(code));
|
||||
QRY_ERR_RET(code);
|
||||
}
|
||||
} else {
|
||||
pSession->resRows += (*ppRes)->info.rows;
|
||||
qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows);
|
||||
|
|
|
@ -2413,7 +2413,10 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) {
|
|||
pGrp->readIdx = 0;
|
||||
//pGrp->endIdx = pGrp->blk->info.rows - 1;
|
||||
} else {
|
||||
(void)taosArrayPop(pCache->grps);
|
||||
if (NULL == taosArrayPop(pCache->grps)) {
|
||||
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
pGrp = taosArrayGet(pCache->grps, 0);
|
||||
if (NULL == pGrp) {
|
||||
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
|
|
|
@ -1375,7 +1375,9 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
|
|||
_return:
|
||||
|
||||
if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) {
|
||||
(void)taosArrayPop(pTable->eqGrps);
|
||||
if (NULL == taosArrayPop(pTable->eqGrps)) {
|
||||
code = terrno;
|
||||
}
|
||||
} else {
|
||||
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
|
||||
}
|
||||
|
|
|
@ -581,7 +581,10 @@ void qwDestroyImpl(void *pMgmt) {
|
|||
int32_t schStatusCount = 0;
|
||||
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
||||
|
||||
(void)taosTmrStop(mgmt->hbTimer); //ignore error
|
||||
if (taosTmrStop(mgmt->hbTimer)) {
|
||||
qTrace("stop qworker hb timer may failed");
|
||||
}
|
||||
|
||||
mgmt->hbTimer = NULL;
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
|
|
|
@ -1187,7 +1187,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
int32_t schNum = taosHashGetSize(mgmt->schHash);
|
||||
if (schNum <= 0) {
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
(void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
|
||||
if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) {
|
||||
qError("reset qworker hb timer error, timer stoppped");
|
||||
}
|
||||
(void)qwRelease(refId); // ignore error
|
||||
return;
|
||||
}
|
||||
|
@ -1199,7 +1201,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
taosMemoryFree(rspList);
|
||||
taosArrayDestroy(pExpiredSch);
|
||||
QW_ELOG("calloc %d SQWHbInfo failed, code:%x", schNum, terrno);
|
||||
(void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
|
||||
if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) {
|
||||
qError("reset qworker hb timer error, timer stoppped");
|
||||
}
|
||||
(void)qwRelease(refId); // ignore error
|
||||
return;
|
||||
}
|
||||
|
@ -1255,7 +1259,10 @@ _return:
|
|||
taosMemoryFreeClear(rspList);
|
||||
taosArrayDestroy(pExpiredSch);
|
||||
|
||||
(void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
|
||||
if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) {
|
||||
qError("reset qworker hb timer error, timer stoppped");
|
||||
}
|
||||
|
||||
(void)qwRelease(refId); // ignore error
|
||||
}
|
||||
|
||||
|
|
|
@ -414,7 +414,9 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_LOG_TASK_START_TS(_task) \
|
||||
do { \
|
||||
int64_t us = taosGetTimestampUs(); \
|
||||
(void)taosArrayPush((_task)->profile.execTime, &us); \
|
||||
if (NULL == taosArrayPush((_task)->profile.execTime, &us)) { \
|
||||
qError("taosArrayPush task execTime failed, error:%s", tstrerror(terrno)); \
|
||||
} \
|
||||
if (0 == (_task)->execId) { \
|
||||
(_task)->profile.startTs = us; \
|
||||
} \
|
||||
|
|
|
@ -512,6 +512,7 @@ int32_t schNotifyUserFetchRes(SSchJob *pJob) {
|
|||
}
|
||||
|
||||
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
|
||||
int32_t code = 0;
|
||||
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
|
||||
|
||||
if (SCH_OP_NULL == pJob->opStatus.op) {
|
||||
|
@ -526,7 +527,10 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
|
|||
|
||||
if (SCH_JOB_IN_SYNC_OP(pJob)) {
|
||||
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
|
||||
(void)tsem_post(&pJob->rspSem); // ignore error
|
||||
code = tsem_post(&pJob->rspSem);
|
||||
if (code) {
|
||||
ctgError("tsem_post failed for syncOp, error:%s", tstrerror(code));
|
||||
}
|
||||
} else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
|
||||
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
|
||||
(void)schNotifyUserExecRes(pJob); // ignore error
|
||||
|
@ -771,7 +775,10 @@ void schFreeJobImpl(void *job) {
|
|||
taosMemoryFreeClear(pJob->userRes.execRes);
|
||||
taosMemoryFreeClear(pJob->fetchRes);
|
||||
taosMemoryFreeClear(pJob->sql);
|
||||
(void)tsem_destroy(&pJob->rspSem); // ignore error
|
||||
int32_t code = tsem_destroy(&pJob->rspSem);
|
||||
if (code) {
|
||||
qError("tsem_destroy failed, error:%s", tstrerror(code));
|
||||
}
|
||||
taosMemoryFree(pJob);
|
||||
|
||||
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||
|
@ -790,7 +797,12 @@ int32_t schJobFetchRows(SSchJob *pJob) {
|
|||
|
||||
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
|
||||
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
|
||||
(void)tsem_wait(&pJob->rspSem); // ignore error
|
||||
code = tsem_wait(&pJob->rspSem);
|
||||
if (code) {
|
||||
qError("tsem_wait for fetch rspSem failed, error:%s", tstrerror(code));
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
|
||||
}
|
||||
} else {
|
||||
|
@ -895,7 +907,10 @@ _return:
|
|||
} else if (pJob->refId < 0) {
|
||||
schFreeJobImpl(pJob);
|
||||
} else {
|
||||
(void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error
|
||||
code = taosRemoveRef(schMgmt.jobRef, pJob->refId);
|
||||
if (code) {
|
||||
SCH_JOB_DLOG("taosRemoveRef job refId:0x%" PRIx64 " from jobRef, error:%s", pJob->refId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
SCH_RET(code);
|
||||
|
@ -909,7 +924,11 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
|
|||
|
||||
if (pReq->syncReq) {
|
||||
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
|
||||
(void)tsem_wait(&pJob->rspSem); // ignore error
|
||||
code = tsem_wait(&pJob->rspSem);
|
||||
if (code) {
|
||||
qError("qid:0x%" PRIx64 " tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code));
|
||||
SCH_ERR_RET(code);
|
||||
}
|
||||
}
|
||||
|
||||
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
|
||||
|
|
|
@ -422,7 +422,9 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
|
|||
|
||||
schDropTaskOnExecNode(pJob, pTask);
|
||||
if (pTask->delayTimer) {
|
||||
(void)taosTmrStopA(&pTask->delayTimer); // ignore error
|
||||
if (!taosTmrStopA(&pTask->delayTimer)) {
|
||||
SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status);
|
||||
}
|
||||
}
|
||||
taosHashClear(pTask->execNodes);
|
||||
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
|
||||
|
@ -1319,7 +1321,9 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
(void)taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
|
||||
if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer)) {
|
||||
SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p", schMgmt.timer);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1350,7 +1354,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
|
|||
|
||||
SCH_LOCK_TASK(pTask);
|
||||
if (pTask->delayTimer) {
|
||||
(void)taosTmrStopA(&pTask->delayTimer);
|
||||
if (!taosTmrStopA(&pTask->delayTimer)) {
|
||||
SCH_TASK_WLOG("stop delayTimer failed, status:%d", pTask->delayTimer);
|
||||
}
|
||||
}
|
||||
schDropTaskOnExecNode(pJob, pTask);
|
||||
SCH_UNLOCK_TASK(pTask);
|
||||
|
|
|
@ -86,6 +86,7 @@ void schFreeHbTrans(SSchHbTrans *pTrans) {
|
|||
}
|
||||
|
||||
void schCleanClusterHb(void *pTrans) {
|
||||
int32_t code = 0;
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
|
||||
|
@ -93,7 +94,10 @@ void schCleanClusterHb(void *pTrans) {
|
|||
if (hb->trans.pTrans == pTrans) {
|
||||
SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
|
||||
schFreeHbTrans(hb);
|
||||
(void)taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
||||
code = taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
||||
if (code) {
|
||||
qError("taosHashRemove hb connection failed, error:%s", tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
hb = taosHashIterate(schMgmt.hbConnections, hb);
|
||||
|
@ -116,7 +120,10 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
|
|||
int64_t taskNum = atomic_load_64(&hb->taskNum);
|
||||
if (taskNum <= 0) {
|
||||
schFreeHbTrans(hb);
|
||||
(void)taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
code = taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
if (code) {
|
||||
SCH_TASK_WLOG("taosHashRemove hb connection failed, error:%s", tstrerror(code));
|
||||
}
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
|
|
|
@ -184,6 +184,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
|
|||
}
|
||||
|
||||
void schedulerDestroy(void) {
|
||||
int32_t code = 0;
|
||||
atomic_store_8((int8_t *)&schMgmt.exit, 1);
|
||||
|
||||
if (schMgmt.jobRef >= 0) {
|
||||
|
@ -195,7 +196,10 @@ void schedulerDestroy(void) {
|
|||
if (refId == 0) {
|
||||
break;
|
||||
}
|
||||
(void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error
|
||||
code = taosRemoveRef(schMgmt.jobRef, pJob->refId);
|
||||
if (code) {
|
||||
qWarn("taosRemoveRef job refId:%" PRId64 " failed, error:%s", pJob->refId, tstrerror(code));
|
||||
}
|
||||
|
||||
pJob = taosIterateRef(schMgmt.jobRef, refId);
|
||||
}
|
||||
|
|
|
@ -488,10 +488,10 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
|||
if (pe->num == 0) {
|
||||
taosHashEntryWUnlock(pHashObj, pe);
|
||||
taosHashRUnlock(pHashObj);
|
||||
return -1;
|
||||
return TSDB_CODE_NOT_FOUND;
|
||||
}
|
||||
|
||||
int code = -1;
|
||||
int code = TSDB_CODE_NOT_FOUND;
|
||||
SHashNode *pNode = pe->next;
|
||||
SHashNode *prevNode = NULL;
|
||||
|
||||
|
|
|
@ -229,9 +229,8 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
||||
void taosReadQitem(STaosQueue *queue, void **ppItem) {
|
||||
STaosQnode *pNode = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
|
||||
|
@ -247,14 +246,11 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
|||
if (queue->qset) {
|
||||
(void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
||||
}
|
||||
code = 1;
|
||||
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
|
||||
queue->memOfItems);
|
||||
}
|
||||
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t taosAllocateQall(STaosQall **qall) {
|
||||
|
|
|
@ -426,7 +426,7 @@ static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) {
|
|||
} else {
|
||||
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
|
||||
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||
code = -1;
|
||||
code = terrno;
|
||||
}
|
||||
|
||||
taosUnlockList(pSet->lockedBy + hash);
|
||||
|
|
|
@ -297,7 +297,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
|||
}
|
||||
|
||||
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||
int32_t code = TSDB_CODE_FAILED;
|
||||
int32_t code = TSDB_CODE_INVALID_PARA;
|
||||
if (!pHashObj || !key) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -492,6 +492,9 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han
|
|||
|
||||
if (timer == NULL) {
|
||||
*pTmrId = taosTmrStart(fp, mseconds, param, handle);
|
||||
if (NULL == *pTmrId) {
|
||||
stopped = true;
|
||||
}
|
||||
return stopped;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue