feature/qnode

This commit is contained in:
dapan1121 2022-01-06 16:07:16 +08:00
parent 9aa87cbf7e
commit e38aeda53e
2 changed files with 8 additions and 6 deletions

View File

@ -450,7 +450,7 @@ int32_t ctgMetaRentInit(SMetaRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
int32_t ctgMetaRentAdd(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size) { int32_t ctgMetaRentAdd(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
int16_t widx = id % mgmt->slotNum; int16_t widx = abs(id % mgmt->slotNum);
SRentSlotInfo *slot = &mgmt->slots[widx]; SRentSlotInfo *slot = &mgmt->slots[widx];
int32_t code = 0; int32_t code = 0;
@ -482,7 +482,7 @@ _return:
} }
int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) { int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
int16_t widx = id % mgmt->slotNum; int16_t widx = abs(id % mgmt->slotNum);
SRentSlotInfo *slot = &mgmt->slots[widx]; SRentSlotInfo *slot = &mgmt->slots[widx];
int32_t code = 0; int32_t code = 0;

View File

@ -509,7 +509,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
break; break;
} }
case TDMT_VND_SUBMIT_RSP: { case TDMT_VND_SUBMIT_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else { } else {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
@ -525,7 +525,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
case TDMT_VND_QUERY_RSP: { case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg; SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) { if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else { } else {
code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY); code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
@ -538,7 +538,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
case TDMT_VND_RES_READY_RSP: { case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)msg; SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) { if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else { } else {
code = schProcessOnTaskSuccess(job, task); code = schProcessOnTaskSuccess(job, task);
@ -553,7 +553,9 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
job->res = rsp; job->res = rsp;
job->resNumOfRows = rsp->numOfRows; if (rsp) {
job->resNumOfRows = rsp->numOfRows;
}
SCH_ERR_JRET(schProcessOnDataFetched(job)); SCH_ERR_JRET(schProcessOnDataFetched(job));
break; break;