feat(api): taos_affected_rows64 add and modfiy tmsg.h

This commit is contained in:
Alex Duan 2022-12-02 23:20:19 +08:00
parent 855b0c8cbe
commit 213b97b79a
10 changed files with 13 additions and 13 deletions

View File

@ -68,7 +68,7 @@ typedef struct SInputData {
typedef struct SOutputData { typedef struct SOutputData {
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t numOfRows; int64_t numOfRows; // int32_t changed to int64_t
int32_t numOfCols; int32_t numOfCols;
int8_t compressed; int8_t compressed;
char* pData; char* pData;

View File

@ -450,7 +450,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->precision = 0; (*pRsp)->precision = 0;
(*pRsp)->compressed = 0; (*pRsp)->compressed = 0;
(*pRsp)->compLen = 0; (*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htonl(pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS);

View File

@ -25,7 +25,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->precision = precision; pRetrieve->precision = precision;
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);

View File

@ -36,7 +36,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->precision = 0; (*pRsp)->precision = 0;
(*pRsp)->compressed = 0; (*pRsp)->compressed = 0;
(*pRsp)->compLen = 0; (*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htonl(pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols); (*pRsp)->numOfCols = htonl(numOfCols);
int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols); int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols);

View File

@ -1615,7 +1615,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
} }
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = htonl(rowNum); rsp->numOfRows = htobe64((int64_t)rowNum);
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock)); int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));

View File

@ -37,7 +37,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
rsp->precision = input->precision; rsp->precision = input->precision;
rsp->compressed = input->compressed; rsp->compressed = input->compressed;
rsp->compLen = htonl(len); rsp->compLen = htonl(len);
rsp->numOfRows = htonl(input->numOfRows); rsp->numOfRows = htobe64(input->numOfRows);
rsp->numOfCols = htonl(input->numOfCols); rsp->numOfCols = htonl(input->numOfCols);
rsp->numOfBlocks = htonl(input->numOfBlocks); rsp->numOfBlocks = htonl(input->numOfBlocks);
} }

View File

@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks, QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks,
pOutput->numOfRows); pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
@ -320,7 +320,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
pOutput->numOfBlocks++; pOutput->numOfBlocks++;
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks, QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows); pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
break; break;
@ -332,7 +332,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
} }
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
break; break;
} }
} }

View File

@ -526,7 +526,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); } void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows)); atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
atomic_store_ptr(&pJob->fetchRes, pRsp); atomic_store_ptr(&pJob->fetchRes, pRsp);

View File

@ -114,7 +114,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
} }
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); SCH_TASK_DLOG("got fetch rsp, rows:%" PRId64 ", complete:%d", htobe64(rsp->numOfRows), rsp->completed);
msg = NULL; msg = NULL;
schProcessOnDataFetched(pJob); schProcessOnDataFetched(pJob);

View File

@ -112,7 +112,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
@ -189,7 +189,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version); pRetrieve->version = htobe64(pBlock->info.version);