fix invalid vgroup id
This commit is contained in:
parent
919df1f039
commit
ac989908a2
|
@ -483,7 +483,7 @@ int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* tas
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
CTG_UNLOCK(CTG_WRITE, &pJob->taskLock);
|
CTG_UNLOCK(CTG_WRITE, &pJob->taskLock);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -905,6 +905,31 @@ int32_t ctgCallUserCb(void* param) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ctgUpdateJobErrCode(SCtgJob* pJob, int32_t errCode) {
|
||||||
|
if (TSDB_CODE_SUCCESS == errCode) return;
|
||||||
|
|
||||||
|
int32_t origCode = atomic_load_32(&pJob->jobResCode);
|
||||||
|
if (TSDB_CODE_SUCCESS == origCode) {
|
||||||
|
if (origCode == atomic_val_compare_exchange_32(&pJob->jobResCode, origCode, errCode)) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
origCode = atomic_load_32(&pJob->jobResCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
|
||||||
|
atomic_store_32(&pJob->jobResCode, errCode);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
qDebug("QID:0x%" PRIx64 " ctg job errCode updated to %s", pJob->queryId, tstrerror(errCode));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
|
int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -924,6 +949,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
|
||||||
if (taskDone < taosArrayGetSize(pJob->pTasks)) {
|
if (taskDone < taosArrayGetSize(pJob->pTasks)) {
|
||||||
qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone,
|
qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone,
|
||||||
(int32_t)taosArrayGetSize(pJob->pTasks));
|
(int32_t)taosArrayGetSize(pJob->pTasks));
|
||||||
|
|
||||||
|
ctgUpdateJobErrCode(pJob, rspCode);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -931,7 +958,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
pJob->jobResCode = code;
|
ctgUpdateJobErrCode(pJob, rspCode);
|
||||||
|
// pJob->jobResCode = code;
|
||||||
|
|
||||||
// taosSsleep(2);
|
// taosSsleep(2);
|
||||||
// qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
|
// qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
|
||||||
|
@ -1098,7 +1126,8 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
|
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
|
||||||
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
if (pTask->res || code) {
|
if (pTask->res || code) {
|
||||||
ctgHandleTaskEnd(pTask, code);
|
ctgHandleTaskEnd(pTask, code);
|
||||||
|
@ -1286,7 +1315,8 @@ _return:
|
||||||
TSWAP(pTask->res, ctx->pResList);
|
TSWAP(pTask->res, ctx->pResList);
|
||||||
taskDone = true;
|
taskDone = true;
|
||||||
}
|
}
|
||||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
|
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
|
||||||
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->res && taskDone) {
|
if (pTask->res && taskDone) {
|
||||||
|
|
Loading…
Reference in New Issue