feat(api): add int64 affected rows modify tmsg.h struct
This commit is contained in:
parent
9fb4fa0a7f
commit
855b0c8cbe
|
@ -1941,7 +1941,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
|
||||||
|
|
||||||
pResultInfo->pRspMsg = (const char*)pRsp;
|
pResultInfo->pRspMsg = (const char*)pRsp;
|
||||||
pResultInfo->pData = (void*)pRsp->data;
|
pResultInfo->pData = (void*)pRsp->data;
|
||||||
pResultInfo->numOfRows = htonll(pRsp->numOfRows);
|
pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
|
||||||
pResultInfo->current = 0;
|
pResultInfo->current = 0;
|
||||||
pResultInfo->completed = (pRsp->completed == 1);
|
pResultInfo->completed = (pRsp->completed == 1);
|
||||||
pResultInfo->payloadLen = htonl(pRsp->compLen);
|
pResultInfo->payloadLen = htonl(pRsp->compLen);
|
||||||
|
|
|
@ -367,7 +367,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
pSourceDataInfo->pRsp = pMsg->pData;
|
pSourceDataInfo->pRsp = pMsg->pData;
|
||||||
|
|
||||||
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
|
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
|
||||||
pRsp->numOfRows = htonll(pRsp->numOfRows);
|
pRsp->numOfRows = htobe64(pRsp->numOfRows);
|
||||||
pRsp->compLen = htonl(pRsp->compLen);
|
pRsp->compLen = htonl(pRsp->compLen);
|
||||||
pRsp->numOfCols = htonl(pRsp->numOfCols);
|
pRsp->numOfCols = htonl(pRsp->numOfCols);
|
||||||
pRsp->useconds = htobe64(pRsp->useconds);
|
pRsp->useconds = htobe64(pRsp->useconds);
|
||||||
|
@ -655,7 +655,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
|
||||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
||||||
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
|
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
|
||||||
|
@ -664,7 +664,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
pExchangeInfo->current += 1;
|
pExchangeInfo->current += 1;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", totalRows:%" PRIu64
|
||||||
", totalBytes:%" PRIu64,
|
", totalBytes:%" PRIu64,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
||||||
pLoadInfo->totalRows, pLoadInfo->totalSize);
|
pLoadInfo->totalRows, pLoadInfo->totalSize);
|
||||||
|
|
|
@ -528,7 +528,7 @@ void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH);
|
||||||
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
|
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
|
||||||
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
|
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
|
||||||
|
|
||||||
atomic_store_64(&pJob->resNumOfRows, htonll(pRsp->numOfRows));
|
atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
|
||||||
atomic_store_ptr(&pJob->fetchRes, pRsp);
|
atomic_store_ptr(&pJob->fetchRes, pRsp);
|
||||||
|
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
||||||
|
|
|
@ -108,7 +108,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_ptr(&pJob->fetchRes, rsp);
|
atomic_store_ptr(&pJob->fetchRes, rsp);
|
||||||
atomic_add_fetch_64(&pJob->resNumOfRows, htonll(rsp->numOfRows));
|
atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
|
||||||
|
|
||||||
if (rsp->completed) {
|
if (rsp->completed) {
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
||||||
|
|
Loading…
Reference in New Issue