Merge pull request #27849 from taosdata/fix/TD-31896
fix: function return code issue
This commit is contained in:
commit
446d3ab97c
|
@ -79,7 +79,7 @@ void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
|
||||||
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item);
|
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item);
|
||||||
void taosFreeQitem(void *pItem);
|
void taosFreeQitem(void *pItem);
|
||||||
int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
|
int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
|
||||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
|
void taosReadQitem(STaosQueue *queue, void **ppItem);
|
||||||
bool taosQueueEmpty(STaosQueue *queue);
|
bool taosQueueEmpty(STaosQueue *queue);
|
||||||
void taosUpdateItemSize(STaosQueue *queue, int32_t items);
|
void taosUpdateItemSize(STaosQueue *queue, int32_t items);
|
||||||
int32_t taosQueueItemSize(STaosQueue *queue);
|
int32_t taosQueueItemSize(STaosQueue *queue);
|
||||||
|
|
|
@ -730,7 +730,7 @@ static void* monitorThreadFunc(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
MonitorSlowLogData* slowLogData = NULL;
|
MonitorSlowLogData* slowLogData = NULL;
|
||||||
(void)taosReadQitem(monitorQueue, (void**)&slowLogData);
|
taosReadQitem(monitorQueue, (void**)&slowLogData);
|
||||||
if (slowLogData != NULL) {
|
if (slowLogData != NULL) {
|
||||||
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG && quitCnt == 0) {
|
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG && quitCnt == 0) {
|
||||||
if (slowLogData->pFile != NULL) {
|
if (slowLogData->pFile != NULL) {
|
||||||
|
|
|
@ -706,7 +706,9 @@ void ctgProcessTimerEvent(void *param, void *tmrId) {
|
||||||
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
|
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
|
||||||
if (code) {
|
if (code) {
|
||||||
qError("clear cache enqueue failed, error:%s", tstrerror(code));
|
qError("clear cache enqueue failed, error:%s", tstrerror(code));
|
||||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||||
|
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
goto _return;
|
goto _return;
|
||||||
|
@ -714,7 +716,9 @@ void ctgProcessTimerEvent(void *param, void *tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("reset catalog timer");
|
qTrace("reset catalog timer");
|
||||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||||
|
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||||
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
@ -1517,10 +1521,16 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SC
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (pJob) {
|
if (pJob) {
|
||||||
(void)taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);
|
int32_t code2 = taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);
|
||||||
|
if (TSDB_CODE_SUCCESS) {
|
||||||
|
qError("release catalog job refId %" PRId64 "falied, error:%s", pJob->refId, tstrerror(code2));
|
||||||
|
}
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
(void)taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
code2 = taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
||||||
|
if (TSDB_CODE_SUCCESS) {
|
||||||
|
qError("remove catalog job refId %" PRId64 "falied, error:%s", pJob->refId, tstrerror(code2));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1967,7 +1977,9 @@ void catalogDestroy(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (gCtgMgmt.cacheTimer) {
|
if (gCtgMgmt.cacheTimer) {
|
||||||
(void)taosTmrStop(gCtgMgmt.cacheTimer);
|
if (taosTmrStop(gCtgMgmt.cacheTimer)) {
|
||||||
|
qTrace("stop catalog cache timer may failed");
|
||||||
|
}
|
||||||
gCtgMgmt.cacheTimer = NULL;
|
gCtgMgmt.cacheTimer = NULL;
|
||||||
taosTmrCleanUp(gCtgMgmt.timer);
|
taosTmrCleanUp(gCtgMgmt.timer);
|
||||||
gCtgMgmt.timer = NULL;
|
gCtgMgmt.timer = NULL;
|
||||||
|
|
|
@ -1007,7 +1007,11 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
CTG_ERR_JRET(terrno);
|
CTG_ERR_JRET(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
void* p = taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
||||||
|
if (NULL == p) {
|
||||||
|
ctgError("acquire job from ref failed, refId:%" PRId64 ", error: %s", pJob->refId, tstrerror(terrno));
|
||||||
|
CTG_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
qDebug("qid:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms",
|
qDebug("qid:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms",
|
||||||
|
@ -1406,7 +1410,11 @@ int32_t ctgCallUserCb(void* param) {
|
||||||
|
|
||||||
qDebug("qid:0x%" PRIx64 " ctg end to call user cb", pJob->queryId);
|
qDebug("qid:0x%" PRIx64 " ctg end to call user cb", pJob->queryId);
|
||||||
|
|
||||||
(void)taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
|
int64_t refId = pJob->refId;
|
||||||
|
int32_t code = taosRemoveRef(gCtgMgmt.jobPool, refId);
|
||||||
|
if (code) {
|
||||||
|
qError("qid:0x%" PRIx64 " remove ctg job %" PRId64 " from jobPool failed, error:%s", pJob->queryId, refId, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2915,21 +2915,27 @@ void ctgClearMetaCache(SCtgCacheOperation *operation) {
|
||||||
|
|
||||||
if (CTG_CACHE_LOW(remainSize, cacheMaxSize)) {
|
if (CTG_CACHE_LOW(remainSize, cacheMaxSize)) {
|
||||||
qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize);
|
qDebug("catalog finish meta clear, remainSize:%" PRId64 ", cacheMaxSize:%dMB", remainSize, cacheMaxSize);
|
||||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||||
|
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!roundDone) {
|
if (!roundDone) {
|
||||||
qDebug("catalog all meta cleared, remainSize:%" PRId64 ", cacheMaxSize:%dMB, to clear handle", remainSize, cacheMaxSize);
|
qDebug("catalog all meta cleared, remainSize:%" PRId64 ", cacheMaxSize:%dMB, to clear handle", remainSize, cacheMaxSize);
|
||||||
ctgClearFreeCache(operation);
|
ctgClearFreeCache(operation);
|
||||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||||
|
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
|
int32_t code = ctgClearCacheEnqueue(NULL, true, false, false, false);
|
||||||
if (code) {
|
if (code) {
|
||||||
qError("clear cache enqueue failed, error:%s", tstrerror(code));
|
qError("clear cache enqueue failed, error:%s", tstrerror(code));
|
||||||
(void)taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer);
|
if (taosTmrReset(ctgProcessTimerEvent, CTG_DEFAULT_CACHE_MON_MSEC, NULL, gCtgMgmt.timer, &gCtgMgmt.cacheTimer)) {
|
||||||
|
qError("reset catalog cache monitor timer error, timer stoppped");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2993,7 +2999,10 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) {
|
||||||
pCtgCache->pTsmas = NULL;
|
pCtgCache->pTsmas = NULL;
|
||||||
|
|
||||||
ctgDebug("all tsmas for table dropped: %s.%s", msg->dbFName, msg->tbName);
|
ctgDebug("all tsmas for table dropped: %s.%s", msg->dbFName, msg->tbName);
|
||||||
(void)taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN);
|
code = taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
ctgError("remove table %s.%s from tsmaCache failed, error:%s", msg->dbFName, msg->tbName, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
|
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
|
||||||
} else {
|
} else {
|
||||||
|
@ -3191,6 +3200,7 @@ void ctgCleanupCacheQueue(void) {
|
||||||
SCtgQNode *nodeNext = NULL;
|
SCtgQNode *nodeNext = NULL;
|
||||||
SCtgCacheOperation *op = NULL;
|
SCtgCacheOperation *op = NULL;
|
||||||
bool stopQueue = false;
|
bool stopQueue = false;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
node = gCtgMgmt.queue.head->next;
|
node = gCtgMgmt.queue.head->next;
|
||||||
|
@ -3209,7 +3219,10 @@ void ctgCleanupCacheQueue(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (op->syncOp) {
|
if (op->syncOp) {
|
||||||
(void)tsem_post(&op->rspSem);
|
code = tsem_post(&op->rspSem);
|
||||||
|
if (code) {
|
||||||
|
qError("tsem_post failed when cleanup cache queue, error:%s", tstrerror(code));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(op);
|
taosMemoryFree(op);
|
||||||
}
|
}
|
||||||
|
@ -3234,12 +3247,13 @@ void ctgCleanupCacheQueue(void) {
|
||||||
|
|
||||||
void *ctgUpdateThreadFunc(void *param) {
|
void *ctgUpdateThreadFunc(void *param) {
|
||||||
setThreadName("catalog");
|
setThreadName("catalog");
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
qInfo("catalog update thread started");
|
qInfo("catalog update thread started");
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
|
if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
|
||||||
qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
qError("ctg tsem_wait failed, error:%s", tstrerror(terrno));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
|
if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
|
||||||
|
@ -3256,7 +3270,10 @@ void *ctgUpdateThreadFunc(void *param) {
|
||||||
(void)(*gCtgCacheOperation[operation->opId].func)(operation); // ignore any error
|
(void)(*gCtgCacheOperation[operation->opId].func)(operation); // ignore any error
|
||||||
|
|
||||||
if (operation->syncOp) {
|
if (operation->syncOp) {
|
||||||
(void)tsem_post(&operation->rspSem);
|
code = tsem_post(&operation->rspSem);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tsem_post failed for syncOp update, error:%s", tstrerror(code));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFreeClear(operation);
|
taosMemoryFreeClear(operation);
|
||||||
}
|
}
|
||||||
|
|
|
@ -470,7 +470,10 @@ _return:
|
||||||
taosMemoryFree(pMsg->pEpSet);
|
taosMemoryFree(pMsg->pEpSet);
|
||||||
|
|
||||||
if (pJob) {
|
if (pJob) {
|
||||||
(void)taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
|
int32_t code2 = taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
|
||||||
|
if (code2) {
|
||||||
|
qError("release ctg job refId:%" PRId64 " failed, error:%s", cbParam->refId, tstrerror(code2));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
|
@ -1087,11 +1090,16 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
|
||||||
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
|
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
|
||||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
(void)tNameExtractFullName(name, tbFName);
|
|
||||||
|
|
||||||
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
|
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
|
||||||
|
|
||||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
|
int32_t code = tNameExtractFullName(name, tbFName);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||||
CTG_ERR_RET(code);
|
CTG_ERR_RET(code);
|
||||||
|
@ -1403,17 +1411,23 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
int32_t reqType = TDMT_VND_TABLE_CFG;
|
int32_t reqType = TDMT_VND_TABLE_CFG;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
(void)tNameExtractFullName(pTableName, tbFName);
|
|
||||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
(void)tNameGetFullDbName(pTableName, dbFName);
|
(void)tNameGetFullDbName(pTableName, dbFName);
|
||||||
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
|
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
|
||||||
|
|
||||||
|
int32_t code = tNameExtractFullName(pTableName, tbFName);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
|
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
|
||||||
ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
|
ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
|
||||||
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
||||||
|
|
||||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||||
CTG_ERR_RET(code);
|
CTG_ERR_RET(code);
|
||||||
|
@ -1471,15 +1485,20 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
int32_t reqType = TDMT_MND_TABLE_CFG;
|
int32_t reqType = TDMT_MND_TABLE_CFG;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
(void)tNameExtractFullName(pTableName, tbFName);
|
|
||||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
(void)tNameGetFullDbName(pTableName, dbFName);
|
(void)tNameGetFullDbName(pTableName, dbFName);
|
||||||
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
|
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
|
||||||
|
|
||||||
|
int32_t code = tNameExtractFullName(pTableName, tbFName);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName);
|
ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName);
|
||||||
|
|
||||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||||
CTG_ERR_RET(code);
|
CTG_ERR_RET(code);
|
||||||
|
@ -1583,11 +1602,15 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName*
|
||||||
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
||||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||||
(void)tNameExtractFullName(pName, fullName);
|
int32_t code = tNameExtractFullName(pName, fullName);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pName->type, pName->dbname, pName->tname);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
ctgDebug("try to get view info from mnode, viewFName:%s", fullName);
|
ctgDebug("try to get view info from mnode, viewFName:%s", fullName);
|
||||||
|
|
||||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](fullName, &msg, 0, &msgLen, mallocFp);
|
code = queryBuildMsg[TMSG_INDEX(reqType)](fullName, &msg, 0, &msgLen, mallocFp);
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgError("Build view-meta msg failed, code:%x, viewFName:%s", code, fullName);
|
ctgError("Build view-meta msg failed, code:%x, viewFName:%s", code, fullName);
|
||||||
CTG_ERR_RET(code);
|
CTG_ERR_RET(code);
|
||||||
|
@ -1640,11 +1663,15 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
|
||||||
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
||||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
(void)tNameExtractFullName(name, tbFName);
|
int32_t code = tNameExtractFullName(name, tbFName);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
|
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
|
||||||
|
|
||||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
|
code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||||
CTG_ERR_RET(code);
|
CTG_ERR_RET(code);
|
||||||
|
@ -1697,7 +1724,12 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
int32_t reqType = TDMT_VND_GET_STREAM_PROGRESS;
|
int32_t reqType = TDMT_VND_GET_STREAM_PROGRESS;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
(void)tNameExtractFullName(pTbName, tbFName);
|
int32_t code = tNameExtractFullName(pTbName, tbFName);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTbName->type, pTbName->dbname, pTbName->tname);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
||||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||||
|
|
||||||
|
@ -1705,7 +1737,7 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c
|
||||||
ctgDebug("try to get stream progress from vnode, vgId:%d, ep num:%d, ep %s:%d, target:%s", vgroupInfo->vgId,
|
ctgDebug("try to get stream progress from vnode, vgId:%d, ep num:%d, ep %s:%d, target:%s", vgroupInfo->vgId,
|
||||||
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
||||||
|
|
||||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](bInput, &msg, 0, &msgLen, mallocFp);
|
code = queryBuildMsg[TMSG_INDEX(reqType)](bInput, &msg, 0, &msgLen, mallocFp);
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgError("Build get stream progress failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
ctgError("Build get stream progress failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||||
CTG_ERR_RET(code);
|
CTG_ERR_RET(code);
|
||||||
|
|
|
@ -433,6 +433,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
||||||
|
|
||||||
void ctgClearHandleMeta(SCatalog* pCtg, int64_t* pClearedSize, int64_t* pCleardNum, bool* roundDone) {
|
void ctgClearHandleMeta(SCatalog* pCtg, int64_t* pClearedSize, int64_t* pCleardNum, bool* roundDone) {
|
||||||
int64_t cacheSize = 0;
|
int64_t cacheSize = 0;
|
||||||
|
int32_t code = 0;
|
||||||
void* pIter = taosHashIterate(pCtg->dbCache, NULL);
|
void* pIter = taosHashIterate(pCtg->dbCache, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SCtgDBCache* dbCache = pIter;
|
SCtgDBCache* dbCache = pIter;
|
||||||
|
@ -447,7 +448,10 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t* pClearedSize, int64_t* pCleardN
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosHashRemove(dbCache->tbCache, key, len);
|
code = taosHashRemove(dbCache->tbCache, key, len);
|
||||||
|
if (code) {
|
||||||
|
qError("taosHashRemove table cache failed, key:%s, len:%d, error:%s", (char*)key, (int32_t)len, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
cacheSize =
|
cacheSize =
|
||||||
len + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(pCache->pMeta) + ctgGetTbIndexCacheSize(pCache->pIndex);
|
len + sizeof(SCtgTbCache) + ctgGetTbMetaCacheSize(pCache->pMeta) + ctgGetTbIndexCacheSize(pCache->pIndex);
|
||||||
|
@ -1201,7 +1205,11 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* d
|
||||||
|
|
||||||
SVgroupInfo* vgInfo = NULL;
|
SVgroupInfo* vgInfo = NULL;
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
(void)tNameExtractFullName(pTableName, tbFullName);
|
code = tNameExtractFullName(pTableName, tbFullName);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
||||||
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
||||||
|
@ -1965,7 +1973,12 @@ int32_t ctgChkSetTbAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
|
||||||
|
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
(void)tNameExtractFullName(&req->pRawReq->tbName, tbFName);
|
code = tNameExtractFullName(&req->pRawReq->tbName, tbFName);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
(void)tNameGetFullDbName(&req->pRawReq->tbName, dbFName);
|
(void)tNameGetFullDbName(&req->pRawReq->tbName, dbFName);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -2151,7 +2164,11 @@ int32_t ctgChkSetViewAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res)
|
||||||
if (IS_SYS_DBNAME(req->pRawReq->tbName.dbname)) {
|
if (IS_SYS_DBNAME(req->pRawReq->tbName.dbname)) {
|
||||||
(void)snprintf(viewFName, sizeof(viewFName), "%s.%s", req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
|
(void)snprintf(viewFName, sizeof(viewFName), "%s.%s", req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
|
||||||
} else {
|
} else {
|
||||||
(void)tNameExtractFullName(&req->pRawReq->tbName, viewFName);
|
code = tNameExtractFullName(&req->pRawReq->tbName, viewFName);
|
||||||
|
if (code) {
|
||||||
|
ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
int32_t len = strlen(viewFName) + 1;
|
int32_t len = strlen(viewFName) + 1;
|
||||||
|
|
||||||
|
|
|
@ -190,7 +190,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRaw
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataDeleterBuf* pBuf = NULL;
|
SDataDeleterBuf* pBuf = NULL;
|
||||||
(void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||||
if (pBuf != NULL) {
|
if (pBuf != NULL) {
|
||||||
TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
|
TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
|
||||||
taosFreeQitem(pBuf);
|
taosFreeQitem(pBuf);
|
||||||
|
@ -248,7 +248,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
||||||
taosMemoryFree(pDeleter->pParam);
|
taosMemoryFree(pDeleter->pParam);
|
||||||
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
|
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
|
||||||
SDataDeleterBuf* pBuf = NULL;
|
SDataDeleterBuf* pBuf = NULL;
|
||||||
(void)taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||||
|
|
||||||
if (pBuf != NULL) {
|
if (pBuf != NULL) {
|
||||||
taosMemoryFreeClear(pBuf->pData);
|
taosMemoryFreeClear(pBuf->pData);
|
||||||
|
|
|
@ -233,7 +233,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRow
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataDispatchBuf* pBuf = NULL;
|
SDataDispatchBuf* pBuf = NULL;
|
||||||
(void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||||
if (pBuf != NULL) {
|
if (pBuf != NULL) {
|
||||||
TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
TAOS_MEMCPY(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
||||||
taosFreeQitem(pBuf);
|
taosFreeQitem(pBuf);
|
||||||
|
@ -291,7 +291,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
||||||
|
|
||||||
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
||||||
SDataDispatchBuf* pBuf = NULL;
|
SDataDispatchBuf* pBuf = NULL;
|
||||||
(void)taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||||
if (pBuf != NULL) {
|
if (pBuf != NULL) {
|
||||||
taosMemoryFreeClear(pBuf->pData);
|
taosMemoryFreeClear(pBuf->pData);
|
||||||
taosFreeQitem(pBuf);
|
taosFreeQitem(pBuf);
|
||||||
|
|
|
@ -57,6 +57,7 @@ typedef struct SSubmitRspParam {
|
||||||
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
SSubmitRspParam* pParam = (SSubmitRspParam*)param;
|
SSubmitRspParam* pParam = (SSubmitRspParam*)param;
|
||||||
SDataInserterHandle* pInserter = pParam->pInserter;
|
SDataInserterHandle* pInserter = pParam->pInserter;
|
||||||
|
int32_t code2 = 0;
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
pInserter->submitRes.code = code;
|
pInserter->submitRes.code = code;
|
||||||
|
@ -106,7 +107,14 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
(void)tsem_post(&pInserter->ready);
|
code2 = tsem_post(&pInserter->ready);
|
||||||
|
if (code2 < 0) {
|
||||||
|
qError("tsem_post inserter ready failed, error:%s", tstrerror(code2));
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pInserter->submitRes.code = code2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -562,6 +562,7 @@ static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPost
|
||||||
|
|
||||||
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) {
|
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) {
|
||||||
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
|
SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
pPost->isStarted = false;
|
pPost->isStarted = false;
|
||||||
|
|
||||||
|
@ -572,7 +573,11 @@ static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDy
|
||||||
if (pPost->leftNeedCache) {
|
if (pPost->leftNeedCache) {
|
||||||
uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
|
uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
|
||||||
if (num && --(*num) <= 0) {
|
if (num && --(*num) <= 0) {
|
||||||
(void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
|
code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
|
||||||
|
if (code) {
|
||||||
|
qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
|
||||||
|
QRY_ERR_RET(code);
|
||||||
|
}
|
||||||
QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
|
QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -580,7 +585,11 @@ static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDy
|
||||||
if (!pPost->rightNeedCache) {
|
if (!pPost->rightNeedCache) {
|
||||||
void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
|
void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
|
||||||
if (NULL != v) {
|
if (NULL != v) {
|
||||||
(void)tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
|
code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
|
||||||
|
if (code) {
|
||||||
|
qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
|
||||||
|
QRY_ERR_RET(code);
|
||||||
|
}
|
||||||
QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
|
QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -661,7 +670,11 @@ static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnc
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
if (1 == (*pNum)) {
|
if (1 == (*pNum)) {
|
||||||
(void)tSimpleHashRemove(pOnceHash, pKey, keySize);
|
code = tSimpleHashRemove(pOnceHash, pKey, keySize);
|
||||||
|
if (code) {
|
||||||
|
qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
|
||||||
|
QRY_ERR_RET(code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
(*pNum)++;
|
(*pNum)++;
|
||||||
break;
|
break;
|
||||||
|
@ -812,8 +825,12 @@ static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
uint64_t* pUid = NULL;
|
uint64_t* pUid = NULL;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
|
int32_t code = 0;
|
||||||
while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
|
while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
|
||||||
(void)tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
|
code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
|
||||||
|
if (code) {
|
||||||
|
qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
|
pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
|
||||||
|
|
|
@ -30,7 +30,9 @@
|
||||||
|
|
||||||
static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) {
|
static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) {
|
||||||
if (pFileInfo->fd.fd) {
|
if (pFileInfo->fd.fd) {
|
||||||
(void)taosCloseFile(&pFileInfo->fd.fd);
|
if (taosCloseFile(&pFileInfo->fd.fd) < 0) {
|
||||||
|
qError("close group cache file failed, fd:%p, error:%s", pFileInfo->fd.fd, tstrerror(terrno));
|
||||||
|
}
|
||||||
pFileInfo->fd.fd = NULL;
|
pFileInfo->fd.fd = NULL;
|
||||||
(void)taosThreadMutexDestroy(&pFileInfo->fd.mutex);
|
(void)taosThreadMutexDestroy(&pFileInfo->fd.mutex);
|
||||||
}
|
}
|
||||||
|
@ -92,7 +94,9 @@ static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) {
|
||||||
static void freeSGcSessionCtx(void* p) {
|
static void freeSGcSessionCtx(void* p) {
|
||||||
SGcSessionCtx* pSession = p;
|
SGcSessionCtx* pSession = p;
|
||||||
if (pSession->semInit) {
|
if (pSession->semInit) {
|
||||||
(void)tsem_destroy(&pSession->waitSem);
|
if (tsem_destroy(&pSession->waitSem) < 0) {
|
||||||
|
qError("tsem_destroy session waitSem failed, error:%s", tstrerror(terrno));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,6 +266,9 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, S
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) {
|
static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) {
|
||||||
|
if (NULL == pFd) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
(void)taosThreadMutexUnlock(&pFd->mutex);
|
(void)taosThreadMutexUnlock(&pFd->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,6 +281,8 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
|
||||||
SGroupCacheData* pGroup = NULL;
|
SGroupCacheData* pGroup = NULL;
|
||||||
|
|
||||||
while (NULL != pHead) {
|
while (NULL != pHead) {
|
||||||
|
pFd = NULL;
|
||||||
|
|
||||||
if (pGCache->batchFetch) {
|
if (pGCache->batchFetch) {
|
||||||
pFileCtx = &pHead->pCtx->fileCtx;
|
pFileCtx = &pHead->pCtx->fileCtx;
|
||||||
} else {
|
} else {
|
||||||
|
@ -290,7 +299,11 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
|
||||||
|
|
||||||
int64_t blkId = pHead->basic.blkId;
|
int64_t blkId = pHead->basic.blkId;
|
||||||
pHead = pHead->next;
|
pHead = pHead->next;
|
||||||
(void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
||||||
|
if (code) {
|
||||||
|
qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,13 +317,19 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
|
||||||
}
|
}
|
||||||
|
|
||||||
if (deleted) {
|
if (deleted) {
|
||||||
|
releaseFdToFileCtx(pFd);
|
||||||
|
|
||||||
qTrace("FileId:%d-%d-%d already be deleted, skip write",
|
qTrace("FileId:%d-%d-%d already be deleted, skip write",
|
||||||
pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId);
|
pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId);
|
||||||
|
|
||||||
int64_t blkId = pHead->basic.blkId;
|
int64_t blkId = pHead->basic.blkId;
|
||||||
pHead = pHead->next;
|
pHead = pHead->next;
|
||||||
|
|
||||||
(void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
||||||
|
if (code) {
|
||||||
|
qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,7 +355,11 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
|
||||||
int64_t blkId = pHead->basic.blkId;
|
int64_t blkId = pHead->basic.blkId;
|
||||||
pHead = pHead->next;
|
pHead = pHead->next;
|
||||||
|
|
||||||
(void)taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
code = taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
|
||||||
|
if (code) {
|
||||||
|
qError("taosHashRemove blk %" PRId64 " from diryBlk failed, error:%s", blkId, tstrerror(code));
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
@ -1038,7 +1061,11 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator
|
||||||
}
|
}
|
||||||
SGcSessionCtx* pWaitCtx = *ppWaitCtx;
|
SGcSessionCtx* pWaitCtx = *ppWaitCtx;
|
||||||
pWaitCtx->newFetch = true;
|
pWaitCtx->newFetch = true;
|
||||||
(void)taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
|
code = taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
|
||||||
|
if (code) {
|
||||||
|
qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", *pSessionId, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
QRY_ERR_RET(tsem_post(&pWaitCtx->waitSem));
|
QRY_ERR_RET(tsem_post(&pWaitCtx->waitSem));
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -1127,14 +1154,22 @@ static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstr
|
||||||
|
|
||||||
QRY_ERR_JRET(taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES));
|
QRY_ERR_JRET(taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES));
|
||||||
|
|
||||||
(void)tsem_wait(&pSession->waitSem);
|
code = tsem_wait(&pSession->waitSem);
|
||||||
|
if (code) {
|
||||||
|
qError("tsem_wait failed, error:%s", tstrerror(code));
|
||||||
|
QRY_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
if (pSession->newFetch) {
|
if (pSession->newFetch) {
|
||||||
pSession->newFetch = false;
|
pSession->newFetch = false;
|
||||||
return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
|
return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
|
code = taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
|
||||||
|
if (code) {
|
||||||
|
qError("taosHashRemove session %" PRId64 " from waitSession failed, error: %s", sessionId, tstrerror(code));
|
||||||
|
QRY_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
bool got = false;
|
bool got = false;
|
||||||
return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
|
return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
|
||||||
|
@ -1280,14 +1315,22 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock
|
||||||
SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
||||||
if (ppBlock) {
|
if (ppBlock) {
|
||||||
QRY_ERR_RET(releaseBaseBlockToList(pCtx, *ppBlock));
|
QRY_ERR_RET(releaseBaseBlockToList(pCtx, *ppBlock));
|
||||||
(void)taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
code = taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
||||||
|
if (code) {
|
||||||
|
qError("taosHashRemove session %" PRId64 " from pReadBlk failed, error: %s", pGcParam->sessionId, tstrerror(code));
|
||||||
|
QRY_ERR_RET(code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes);
|
code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes);
|
||||||
if (NULL == *ppRes) {
|
if (NULL == *ppRes) {
|
||||||
qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows);
|
qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows);
|
||||||
(void)taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
code = taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
|
||||||
|
if (code) {
|
||||||
|
qError("taosHashRemove session %" PRId64 " from pSessions failed, error: %s", pGcParam->sessionId, tstrerror(code));
|
||||||
|
QRY_ERR_RET(code);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pSession->resRows += (*ppRes)->info.rows;
|
pSession->resRows += (*ppRes)->info.rows;
|
||||||
qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows);
|
qDebug("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows);
|
||||||
|
|
|
@ -2413,7 +2413,10 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) {
|
||||||
pGrp->readIdx = 0;
|
pGrp->readIdx = 0;
|
||||||
//pGrp->endIdx = pGrp->blk->info.rows - 1;
|
//pGrp->endIdx = pGrp->blk->info.rows - 1;
|
||||||
} else {
|
} else {
|
||||||
(void)taosArrayPop(pCache->grps);
|
if (NULL == taosArrayPop(pCache->grps)) {
|
||||||
|
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
pGrp = taosArrayGet(pCache->grps, 0);
|
pGrp = taosArrayGet(pCache->grps, 0);
|
||||||
if (NULL == pGrp) {
|
if (NULL == pGrp) {
|
||||||
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
|
|
@ -1378,7 +1378,9 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) {
|
if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) {
|
||||||
(void)taosArrayPop(pTable->eqGrps);
|
if (NULL == taosArrayPop(pTable->eqGrps)) {
|
||||||
|
code = terrno;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
|
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -581,7 +581,10 @@ void qwDestroyImpl(void *pMgmt) {
|
||||||
int32_t schStatusCount = 0;
|
int32_t schStatusCount = 0;
|
||||||
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
||||||
|
|
||||||
(void)taosTmrStop(mgmt->hbTimer); //ignore error
|
if (taosTmrStop(mgmt->hbTimer)) {
|
||||||
|
qTrace("stop qworker hb timer may failed");
|
||||||
|
}
|
||||||
|
|
||||||
mgmt->hbTimer = NULL;
|
mgmt->hbTimer = NULL;
|
||||||
taosTmrCleanUp(mgmt->timer);
|
taosTmrCleanUp(mgmt->timer);
|
||||||
|
|
||||||
|
|
|
@ -1187,7 +1187,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
||||||
int32_t schNum = taosHashGetSize(mgmt->schHash);
|
int32_t schNum = taosHashGetSize(mgmt->schHash);
|
||||||
if (schNum <= 0) {
|
if (schNum <= 0) {
|
||||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||||
(void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
|
if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) {
|
||||||
|
qError("reset qworker hb timer error, timer stoppped");
|
||||||
|
}
|
||||||
(void)qwRelease(refId); // ignore error
|
(void)qwRelease(refId); // ignore error
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1199,7 +1201,9 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
||||||
taosMemoryFree(rspList);
|
taosMemoryFree(rspList);
|
||||||
taosArrayDestroy(pExpiredSch);
|
taosArrayDestroy(pExpiredSch);
|
||||||
QW_ELOG("calloc %d SQWHbInfo failed, code:%x", schNum, terrno);
|
QW_ELOG("calloc %d SQWHbInfo failed, code:%x", schNum, terrno);
|
||||||
(void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
|
if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) {
|
||||||
|
qError("reset qworker hb timer error, timer stoppped");
|
||||||
|
}
|
||||||
(void)qwRelease(refId); // ignore error
|
(void)qwRelease(refId); // ignore error
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1255,7 +1259,10 @@ _return:
|
||||||
taosMemoryFreeClear(rspList);
|
taosMemoryFreeClear(rspList);
|
||||||
taosArrayDestroy(pExpiredSch);
|
taosArrayDestroy(pExpiredSch);
|
||||||
|
|
||||||
(void)taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); // ignore error
|
if (taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer)) {
|
||||||
|
qError("reset qworker hb timer error, timer stoppped");
|
||||||
|
}
|
||||||
|
|
||||||
(void)qwRelease(refId); // ignore error
|
(void)qwRelease(refId); // ignore error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -414,7 +414,9 @@ extern SSchedulerMgmt schMgmt;
|
||||||
#define SCH_LOG_TASK_START_TS(_task) \
|
#define SCH_LOG_TASK_START_TS(_task) \
|
||||||
do { \
|
do { \
|
||||||
int64_t us = taosGetTimestampUs(); \
|
int64_t us = taosGetTimestampUs(); \
|
||||||
(void)taosArrayPush((_task)->profile.execTime, &us); \
|
if (NULL == taosArrayPush((_task)->profile.execTime, &us)) { \
|
||||||
|
qError("taosArrayPush task execTime failed, error:%s", tstrerror(terrno)); \
|
||||||
|
} \
|
||||||
if (0 == (_task)->execId) { \
|
if (0 == (_task)->execId) { \
|
||||||
(_task)->profile.startTs = us; \
|
(_task)->profile.startTs = us; \
|
||||||
} \
|
} \
|
||||||
|
|
|
@ -512,6 +512,7 @@ int32_t schNotifyUserFetchRes(SSchJob *pJob) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
|
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
|
||||||
|
int32_t code = 0;
|
||||||
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
|
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
|
||||||
|
|
||||||
if (SCH_OP_NULL == pJob->opStatus.op) {
|
if (SCH_OP_NULL == pJob->opStatus.op) {
|
||||||
|
@ -526,7 +527,10 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
|
||||||
|
|
||||||
if (SCH_JOB_IN_SYNC_OP(pJob)) {
|
if (SCH_JOB_IN_SYNC_OP(pJob)) {
|
||||||
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
|
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
|
||||||
(void)tsem_post(&pJob->rspSem); // ignore error
|
code = tsem_post(&pJob->rspSem);
|
||||||
|
if (code) {
|
||||||
|
SCH_JOB_ELOG("tsem_post failed for syncOp, error:%s", tstrerror(code));
|
||||||
|
}
|
||||||
} else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
|
} else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
|
||||||
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
|
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
|
||||||
(void)schNotifyUserExecRes(pJob); // ignore error
|
(void)schNotifyUserExecRes(pJob); // ignore error
|
||||||
|
@ -771,7 +775,10 @@ void schFreeJobImpl(void *job) {
|
||||||
taosMemoryFreeClear(pJob->userRes.execRes);
|
taosMemoryFreeClear(pJob->userRes.execRes);
|
||||||
taosMemoryFreeClear(pJob->fetchRes);
|
taosMemoryFreeClear(pJob->fetchRes);
|
||||||
taosMemoryFreeClear(pJob->sql);
|
taosMemoryFreeClear(pJob->sql);
|
||||||
(void)tsem_destroy(&pJob->rspSem); // ignore error
|
int32_t code = tsem_destroy(&pJob->rspSem);
|
||||||
|
if (code) {
|
||||||
|
qError("tsem_destroy failed, error:%s", tstrerror(code));
|
||||||
|
}
|
||||||
taosMemoryFree(pJob);
|
taosMemoryFree(pJob);
|
||||||
|
|
||||||
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||||
|
@ -790,7 +797,12 @@ int32_t schJobFetchRows(SSchJob *pJob) {
|
||||||
|
|
||||||
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
|
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
|
||||||
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
|
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
|
||||||
(void)tsem_wait(&pJob->rspSem); // ignore error
|
code = tsem_wait(&pJob->rspSem);
|
||||||
|
if (code) {
|
||||||
|
qError("tsem_wait for fetch rspSem failed, error:%s", tstrerror(code));
|
||||||
|
SCH_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
|
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -895,7 +907,10 @@ _return:
|
||||||
} else if (pJob->refId < 0) {
|
} else if (pJob->refId < 0) {
|
||||||
schFreeJobImpl(pJob);
|
schFreeJobImpl(pJob);
|
||||||
} else {
|
} else {
|
||||||
(void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error
|
code = taosRemoveRef(schMgmt.jobRef, pJob->refId);
|
||||||
|
if (code) {
|
||||||
|
SCH_JOB_DLOG("taosRemoveRef job refId:0x%" PRIx64 " from jobRef, error:%s", pJob->refId, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
|
@ -909,7 +924,11 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
|
||||||
|
|
||||||
if (pReq->syncReq) {
|
if (pReq->syncReq) {
|
||||||
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
|
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
|
||||||
(void)tsem_wait(&pJob->rspSem); // ignore error
|
code = tsem_wait(&pJob->rspSem);
|
||||||
|
if (code) {
|
||||||
|
qError("qid:0x%" PRIx64 " tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code));
|
||||||
|
SCH_ERR_RET(code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
|
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
|
||||||
|
|
|
@ -422,7 +422,9 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
if (pTask->delayTimer) {
|
if (pTask->delayTimer) {
|
||||||
(void)taosTmrStopA(&pTask->delayTimer); // ignore error
|
if (!taosTmrStopA(&pTask->delayTimer)) {
|
||||||
|
SCH_TASK_WLOG("stop task delayTimer failed, may stopped, status:%d", pTask->status);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosHashClear(pTask->execNodes);
|
taosHashClear(pTask->execNodes);
|
||||||
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
|
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
|
||||||
|
@ -1319,7 +1321,9 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
|
if (taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer)) {
|
||||||
|
SCH_TASK_ELOG("taosTmrReset delayExec timer failed, handle:%p", schMgmt.timer);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1350,7 +1354,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
|
||||||
|
|
||||||
SCH_LOCK_TASK(pTask);
|
SCH_LOCK_TASK(pTask);
|
||||||
if (pTask->delayTimer) {
|
if (pTask->delayTimer) {
|
||||||
(void)taosTmrStopA(&pTask->delayTimer);
|
if (!taosTmrStopA(&pTask->delayTimer)) {
|
||||||
|
SCH_TASK_WLOG("stop delayTimer failed, status:%d", pTask->status);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
SCH_UNLOCK_TASK(pTask);
|
SCH_UNLOCK_TASK(pTask);
|
||||||
|
|
|
@ -86,6 +86,7 @@ void schFreeHbTrans(SSchHbTrans *pTrans) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void schCleanClusterHb(void *pTrans) {
|
void schCleanClusterHb(void *pTrans) {
|
||||||
|
int32_t code = 0;
|
||||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||||
|
|
||||||
SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
|
SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
|
||||||
|
@ -93,7 +94,10 @@ void schCleanClusterHb(void *pTrans) {
|
||||||
if (hb->trans.pTrans == pTrans) {
|
if (hb->trans.pTrans == pTrans) {
|
||||||
SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
|
SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
|
||||||
schFreeHbTrans(hb);
|
schFreeHbTrans(hb);
|
||||||
(void)taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
code = taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
||||||
|
if (code) {
|
||||||
|
qError("taosHashRemove hb connection failed, error:%s", tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
hb = taosHashIterate(schMgmt.hbConnections, hb);
|
hb = taosHashIterate(schMgmt.hbConnections, hb);
|
||||||
|
@ -116,7 +120,10 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
|
||||||
int64_t taskNum = atomic_load_64(&hb->taskNum);
|
int64_t taskNum = atomic_load_64(&hb->taskNum);
|
||||||
if (taskNum <= 0) {
|
if (taskNum <= 0) {
|
||||||
schFreeHbTrans(hb);
|
schFreeHbTrans(hb);
|
||||||
(void)taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
code = taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||||
|
if (code) {
|
||||||
|
SCH_TASK_WLOG("taosHashRemove hb connection failed, error:%s", tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||||
|
|
||||||
|
|
|
@ -184,6 +184,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void schedulerDestroy(void) {
|
void schedulerDestroy(void) {
|
||||||
|
int32_t code = 0;
|
||||||
atomic_store_8((int8_t *)&schMgmt.exit, 1);
|
atomic_store_8((int8_t *)&schMgmt.exit, 1);
|
||||||
|
|
||||||
if (schMgmt.jobRef >= 0) {
|
if (schMgmt.jobRef >= 0) {
|
||||||
|
@ -195,7 +196,10 @@ void schedulerDestroy(void) {
|
||||||
if (refId == 0) {
|
if (refId == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
(void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error
|
code = taosRemoveRef(schMgmt.jobRef, pJob->refId);
|
||||||
|
if (code) {
|
||||||
|
qWarn("taosRemoveRef job refId:%" PRId64 " failed, error:%s", pJob->refId, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
pJob = taosIterateRef(schMgmt.jobRef, refId);
|
pJob = taosIterateRef(schMgmt.jobRef, refId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -488,10 +488,10 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||||
if (pe->num == 0) {
|
if (pe->num == 0) {
|
||||||
taosHashEntryWUnlock(pHashObj, pe);
|
taosHashEntryWUnlock(pHashObj, pe);
|
||||||
taosHashRUnlock(pHashObj);
|
taosHashRUnlock(pHashObj);
|
||||||
return -1;
|
return TSDB_CODE_NOT_FOUND;
|
||||||
}
|
}
|
||||||
|
|
||||||
int code = -1;
|
int code = TSDB_CODE_NOT_FOUND;
|
||||||
SHashNode *pNode = pe->next;
|
SHashNode *pNode = pe->next;
|
||||||
SHashNode *prevNode = NULL;
|
SHashNode *prevNode = NULL;
|
||||||
|
|
||||||
|
|
|
@ -229,9 +229,8 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
void taosReadQitem(STaosQueue *queue, void **ppItem) {
|
||||||
STaosQnode *pNode = NULL;
|
STaosQnode *pNode = NULL;
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&queue->mutex);
|
(void)taosThreadMutexLock(&queue->mutex);
|
||||||
|
|
||||||
|
@ -247,14 +246,11 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
||||||
if (queue->qset) {
|
if (queue->qset) {
|
||||||
(void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
(void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
|
||||||
}
|
}
|
||||||
code = 1;
|
|
||||||
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
|
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
|
||||||
queue->memOfItems);
|
queue->memOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosAllocateQall(STaosQall **qall) {
|
int32_t taosAllocateQall(STaosQall **qall) {
|
||||||
|
|
|
@ -444,7 +444,7 @@ static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) {
|
||||||
} else {
|
} else {
|
||||||
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
|
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
|
||||||
terrno = TSDB_CODE_REF_NOT_EXIST;
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
code = -1;
|
code = terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosUnlockList(pSet->lockedBy + hash);
|
taosUnlockList(pSet->lockedBy + hash);
|
||||||
|
|
|
@ -297,7 +297,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||||
int32_t code = TSDB_CODE_FAILED;
|
int32_t code = TSDB_CODE_INVALID_PARA;
|
||||||
if (!pHashObj || !key) {
|
if (!pHashObj || !key) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -492,6 +492,9 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* han
|
||||||
|
|
||||||
if (timer == NULL) {
|
if (timer == NULL) {
|
||||||
*pTmrId = taosTmrStart(fp, mseconds, param, handle);
|
*pTmrId = taosTmrStart(fp, mseconds, param, handle);
|
||||||
|
if (NULL == *pTmrId) {
|
||||||
|
stopped = true;
|
||||||
|
}
|
||||||
return stopped;
|
return stopped;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue