From 39482b24db8660c685f6caf5a848401f647f90f7 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 16:42:38 +0800 Subject: [PATCH 01/12] feat(shell): supported affected rows int64_t --- include/client/taos.h | 1 + source/client/inc/clientInt.h | 4 ++-- source/client/src/clientMain.c | 12 ++++++++++++ source/libs/executor/src/dataDispatcher.c | 2 +- tools/shell/src/shellEngine.c | 16 ++++++++++++++-- 5 files changed, 30 insertions(+), 5 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 2940f1dfd0..69774b750f 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -185,6 +185,7 @@ DLL_EXPORT void taos_kill_query(TAOS *taos); DLL_EXPORT int taos_field_count(TAOS_RES *res); DLL_EXPORT int taos_num_fields(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res); +DLL_EXPORT int64_t taos_affected_rows64(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index aae20c587d..6c2a7d75a0 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -171,9 +171,9 @@ typedef struct SReqResultInfo { char** convertBuf; TAOS_ROW row; SResultColumn* pCol; - uint32_t numOfRows; + uint64_t numOfRows; uint64_t totalRows; - uint32_t current; + uint64_t current; bool localResultFetched; bool completed; int32_t precision; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 976d1dd1b0..fecb94d02c 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -438,11 +438,23 @@ const char *taos_data_type(int type) { const char *taos_get_client_info() { return version; } +// return int32_t int taos_affected_rows(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { return 0; } + SRequestObj *pRequest = (SRequestObj *)res; + SReqResultInfo *pResInfo = &pRequest->body.resInfo; + return (int)pResInfo->numOfRows; +} + +// return int64_t +int64_t taos_affected_rows(TAOS_RES *res) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { + return 0; + } + SRequestObj *pRequest = (SRequestObj *)res; SReqResultInfo *pResInfo = &pRequest->body.resInfo; return pResInfo->numOfRows; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index d4248fc420..78f6155cf7 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -189,7 +189,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { } SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); - pOutput->numOfRows = pEntry->numOfRows; + pOutput->fr = pEntry->numOfRows; pOutput->numOfCols = pEntry->numOfCols; pOutput->compressed = pEntry->compressed; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 118a6caf7a..362a9b3c80 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -214,6 +214,18 @@ void shellRunSingleCommandImp(char *command) { return; } + // pre string + char * pre = "Query OK"; + if (shellRegexMatch(command, "^\\s*delete\\s*from\\s*.*", REG_EXTENDED | REG_ICASE)) { + pre = "Deleted OK"; + } else if(shellRegexMatch(command, "^\\s*insert\\s*into\\s*.*", REG_EXTENDED | REG_ICASE)) { + pre = "Inserted OK"; + } else if(shellRegexMatch(command, "^\\s*create\\s*.*", REG_EXTENDED | REG_ICASE)) { + pre = "Created OK"; + } else if(shellRegexMatch(command, "^\\s*drop\\s*.*", REG_EXTENDED | REG_ICASE)) { + pre = "Droped OK"; + } + TAOS_FIELD *pFields = taos_fetch_fields(pSql); if (pFields != NULL) { // select and show kinds of commands int32_t error_no = 0; @@ -229,10 +241,10 @@ void shellRunSingleCommandImp(char *command) { } taos_free_result(pSql); } else { - int32_t num_rows_affacted = taos_affected_rows(pSql); + int64_t num_rows_affacted = taos_affected_rows64(pSql); taos_free_result(pSql); et = taosGetTimestampUs(); - printf("Query OK, %d row(s) affected (%.6fs)\r\n", num_rows_affacted, (et - st) / 1E6); + printf("%s, %" PRId64 " row(s) affected (%.6fs)\r\n", num_rows_affacted, (et - st) / 1E6); // call auto tab callbackAutoTab(command, NULL, false); From 878f2a2c19c8d9f6859546651c1e66d18a245781 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 17:01:40 +0800 Subject: [PATCH 02/12] fix(shell): reset dataDispatcher mistake modify --- source/libs/executor/src/dataDispatcher.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 78f6155cf7..d4248fc420 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -189,7 +189,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { } SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); - pOutput->fr = pEntry->numOfRows; + pOutput->numOfRows = pEntry->numOfRows; pOutput->numOfCols = pEntry->numOfCols; pOutput->compressed = pEntry->compressed; From 2aa65c4b9c6d7ed7d1d09cf41640cfc9b59f3d67 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 17:09:04 +0800 Subject: [PATCH 03/12] feat(shell): affected rows int64 --- source/client/src/clientImpl.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 846accf51b..77699e89c2 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -317,7 +317,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); } else { - tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId); } @@ -1527,7 +1527,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) return NULL; } - tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); STscObj* pTscObj = pRequest->pTscObj; From 0f0e575481febaae34d084b42378342942dff712 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 17:14:49 +0800 Subject: [PATCH 04/12] feat(shell): affected rows int64 --- source/client/src/clientMain.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index fecb94d02c..8020134c52 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -450,7 +450,7 @@ int taos_affected_rows(TAOS_RES *res) { } // return int64_t -int64_t taos_affected_rows(TAOS_RES *res) { +int64_t taos_affected_rows64(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { return 0; } @@ -971,7 +971,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); } else { - tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId); From 83a4f76ecea33eeaaa04db2d03dbb08455aeb917 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 17:15:51 +0800 Subject: [PATCH 05/12] feat(shell): affected rows int64 --- tools/shell/src/shellEngine.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 362a9b3c80..14757247b7 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -244,7 +244,7 @@ void shellRunSingleCommandImp(char *command) { int64_t num_rows_affacted = taos_affected_rows64(pSql); taos_free_result(pSql); et = taosGetTimestampUs(); - printf("%s, %" PRId64 " row(s) affected (%.6fs)\r\n", num_rows_affacted, (et - st) / 1E6); + printf("%s, %" PRId64 " row(s) affected (%.6fs)\r\n", pre, num_rows_affacted, (et - st) / 1E6); // call auto tab callbackAutoTab(command, NULL, false); From 693a602c62bc888a2d436404df49fc2172952cc9 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 18:27:18 +0800 Subject: [PATCH 06/12] feat(api): modify prompt text --- tools/shell/src/shellEngine.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 14757247b7..5be46fe54a 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -217,13 +217,13 @@ void shellRunSingleCommandImp(char *command) { // pre string char * pre = "Query OK"; if (shellRegexMatch(command, "^\\s*delete\\s*from\\s*.*", REG_EXTENDED | REG_ICASE)) { - pre = "Deleted OK"; + pre = "Delete OK"; } else if(shellRegexMatch(command, "^\\s*insert\\s*into\\s*.*", REG_EXTENDED | REG_ICASE)) { - pre = "Inserted OK"; + pre = "Insert OK"; } else if(shellRegexMatch(command, "^\\s*create\\s*.*", REG_EXTENDED | REG_ICASE)) { - pre = "Created OK"; + pre = "Create OK"; } else if(shellRegexMatch(command, "^\\s*drop\\s*.*", REG_EXTENDED | REG_ICASE)) { - pre = "Droped OK"; + pre = "Drop OK"; } TAOS_FIELD *pFields = taos_fetch_fields(pSql); From 9fb4fa0a7f8443848b0171158d994fe164e10fa8 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 20:48:39 +0800 Subject: [PATCH 07/12] feat(api): add int64 affected rows modify tmsg.h struct --- include/common/tmsg.h | 4 ++-- source/client/inc/clientInt.h | 2 +- source/client/src/clientImpl.c | 2 +- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/exchangeoperator.c | 10 +++++----- source/libs/scheduler/inc/schInt.h | 2 +- source/libs/scheduler/src/schJob.c | 4 ++-- source/libs/scheduler/src/schRemote.c | 8 ++++---- 8 files changed, 17 insertions(+), 17 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0a082a37e4..9ad2e4375b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1403,8 +1403,8 @@ typedef struct { int8_t streamBlockType; int32_t compLen; int32_t numOfBlocks; - int32_t numOfRows; - int32_t numOfCols; + int64_t numOfRows; // from int32_t change to int64_t + int64_t numOfCols; int64_t skey; int64_t ekey; int64_t version; // for stream diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 6c2a7d75a0..ea76f726ea 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -171,7 +171,7 @@ typedef struct SReqResultInfo { char** convertBuf; TAOS_ROW row; SResultColumn* pCol; - uint64_t numOfRows; + uint64_t numOfRows; // from int32_t change to int64_t uint64_t totalRows; uint64_t current; bool localResultFetched; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 77699e89c2..6444150263 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1941,7 +1941,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pData = (void*)pRsp->data; - pResultInfo->numOfRows = htonl(pRsp->numOfRows); + pResultInfo->numOfRows = htonll(pRsp->numOfRows); pResultInfo->current = 0; pResultInfo->completed = (pRsp->completed == 1); pResultInfo->payloadLen = htonl(pRsp->compLen); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 7100be58e3..60f8171bac 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -684,7 +684,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); -void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, +void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs, SOperatorInfo* pOperator); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 963a273290..03e9e411bb 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -115,14 +115,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1, totalSources); } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", + " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } @@ -367,14 +367,14 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pSourceDataInfo->pRsp = pMsg->pData; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; - pRsp->numOfRows = htonl(pRsp->numOfRows); + pRsp->numOfRows = htonll(pRsp->numOfRows); pRsp->compLen = htonl(pRsp->compLen); pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->useconds = htobe64(pRsp->useconds); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); @@ -472,7 +472,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas return TSDB_CODE_SUCCESS; } -void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, +void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs, SOperatorInfo* pOperator) { pInfo->totalRows += numOfRows; pInfo->totalSize += dataLen; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index dfc48e7d9f..77aafa9a27 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -297,7 +297,7 @@ typedef struct SSchJob { SExecResult execRes; void *fetchRes; // TODO free it or not bool fetched; - int32_t resNumOfRows; + int64_t resNumOfRows; // from int32_t to int64_t SSchResInfo userRes; char *sql; SQueryProfileSummary summary; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 0eb29a3667..37a3cc9286 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -412,7 +412,7 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) { SCH_JOB_DLOG("empty res and set query complete, code:%x", code); } - SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows); + SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows); _return: @@ -528,7 +528,7 @@ void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); - atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); + atomic_store_64(&pJob->resNumOfRows, htonll(pRsp->numOfRows)); atomic_store_ptr(&pJob->fetchRes, pRsp); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 1a6d7df349..101fdfe052 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -108,7 +108,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs } atomic_store_ptr(&pJob->fetchRes, rsp); - atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); + atomic_add_fetch_64(&pJob->resNumOfRows, htonll(rsp->numOfRows)); if (rsp->completed) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); @@ -279,7 +279,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa } } - atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); + atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks); SCH_LOCK(SCH_WRITE, &pJob->resLock); @@ -317,7 +317,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa tDecodeSVDeleteRsp(&coder, &rsp); tDecoderClear(&coder); - atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); + atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows); SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); } @@ -344,7 +344,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp)); - atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); + atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows); taosMemoryFreeClear(msg); From 855b0c8cbe5b4ff45effd78a7b348367715c95be Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 20:57:46 +0800 Subject: [PATCH 08/12] feat(api): add int64 affected rows modify tmsg.h struct --- source/client/src/clientImpl.c | 2 +- source/libs/executor/src/exchangeoperator.c | 6 +++--- source/libs/scheduler/src/schJob.c | 2 +- source/libs/scheduler/src/schRemote.c | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 6444150263..f823b527ea 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1941,7 +1941,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pData = (void*)pRsp->data; - pResultInfo->numOfRows = htonll(pRsp->numOfRows); + pResultInfo->numOfRows = htobe64(pRsp->numOfRows); pResultInfo->current = 0; pResultInfo->completed = (pRsp->completed == 1); pResultInfo->payloadLen = htonl(pRsp->compLen); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 03e9e411bb..dfbfe62e13 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -367,7 +367,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pSourceDataInfo->pRsp = pMsg->pData; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; - pRsp->numOfRows = htonll(pRsp->numOfRows); + pRsp->numOfRows = htobe64(pRsp->numOfRows); pRsp->compLen = htonl(pRsp->compLen); pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->useconds = htobe64(pRsp->useconds); @@ -655,7 +655,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, @@ -664,7 +664,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 37a3cc9286..ea49b0c4e0 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -528,7 +528,7 @@ void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); - atomic_store_64(&pJob->resNumOfRows, htonll(pRsp->numOfRows)); + atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows)); atomic_store_ptr(&pJob->fetchRes, pRsp); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 101fdfe052..76ac0a2e9f 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -108,7 +108,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs } atomic_store_ptr(&pJob->fetchRes, rsp); - atomic_add_fetch_64(&pJob->resNumOfRows, htonll(rsp->numOfRows)); + atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows)); if (rsp->completed) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); From 213b97b79ac2722db50cc029f363a88a84f0ac7d Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 2 Dec 2022 23:20:19 +0800 Subject: [PATCH 09/12] feat(api): taos_affected_rows64 add and modfiy tmsg.h --- include/libs/executor/dataSinkMgt.h | 2 +- source/client/src/clientMsgHandler.c | 2 +- source/dnode/vnode/src/tq/tqExec.c | 2 +- source/libs/command/src/command.c | 2 +- source/libs/command/src/explain.c | 2 +- source/libs/qworker/src/qwMsg.c | 2 +- source/libs/qworker/src/qworker.c | 6 +++--- source/libs/scheduler/src/schJob.c | 2 +- source/libs/scheduler/src/schRemote.c | 2 +- source/libs/stream/src/streamDispatch.c | 4 ++-- 10 files changed, 13 insertions(+), 13 deletions(-) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 8a02f372d1..ed7cbc8125 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -68,7 +68,7 @@ typedef struct SInputData { typedef struct SOutputData { int32_t numOfBlocks; - int32_t numOfRows; + int64_t numOfRows; // int32_t changed to int64_t int32_t numOfCols; int8_t compressed; char* pData; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 4bd74a842f..d5a567075c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -450,7 +450,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { (*pRsp)->precision = 0; (*pRsp)->compressed = 0; (*pRsp)->compLen = 0; - (*pRsp)->numOfRows = htonl(pBlock->info.rows); + (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 2514190035..093186ebbb 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -25,7 +25,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t pRetrieve->precision = precision; pRetrieve->compressed = 0; pRetrieve->completed = 1; - pRetrieve->numOfRows = htonl(pBlock->info.rows); + pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); actualLen += sizeof(SRetrieveTableRsp); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index d58c4dc6d3..7661944379 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -36,7 +36,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe (*pRsp)->precision = 0; (*pRsp)->compressed = 0; (*pRsp)->compLen = 0; - (*pRsp)->numOfRows = htonl(pBlock->info.rows); + (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(numOfCols); int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols); diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 410e62a18b..dd48023679 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1615,7 +1615,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { } rsp->completed = 1; - rsp->numOfRows = htonl(rowNum); + rsp->numOfRows = htobe64((int64_t)rowNum); int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index d9a7cea411..7154e22300 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -37,7 +37,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) rsp->precision = input->precision; rsp->compressed = input->compressed; rsp->compLen = htonl(len); - rsp->numOfRows = htonl(input->numOfRows); + rsp->numOfRows = htobe64(input->numOfRows); rsp->numOfCols = htonl(input->numOfCols); rsp->numOfBlocks = htonl(input->numOfBlocks); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 0890d10b65..fe2acb805b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, QW_ERR_RET(code); } - QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks, + QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks, pOutput->numOfRows); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); @@ -320,7 +320,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, pOutput->numOfBlocks++; if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { - QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks, + QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); break; @@ -332,7 +332,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { - QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); + QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); break; } } diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index ea49b0c4e0..b9062859c8 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -526,7 +526,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); } int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { - SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); + SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed); atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows)); atomic_store_ptr(&pJob->fetchRes, pRsp); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 76ac0a2e9f..30622f96bb 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -114,7 +114,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); } - SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); + SCH_TASK_DLOG("got fetch rsp, rows:%" PRId64 ", complete:%d", htobe64(rsp->numOfRows), rsp->completed); msg = NULL; schProcessOnDataFetched(pJob); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 2c36c299ee..4e0b0630bc 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -112,7 +112,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) pRetrieve->compressed = 0; pRetrieve->completed = 1; pRetrieve->streamBlockType = pBlock->info.type; - pRetrieve->numOfRows = htonl(pBlock->info.rows); + pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey); @@ -189,7 +189,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis pRetrieve->compressed = 0; pRetrieve->completed = 1; pRetrieve->streamBlockType = pBlock->info.type; - pRetrieve->numOfRows = htonl(pBlock->info.rows); + pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->version = htobe64(pBlock->info.version); From 81dc25819d6480199c915f13587c5f70415b1e4a Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 3 Dec 2022 09:42:58 +0800 Subject: [PATCH 10/12] feat(api): add int64 affect row fix build error --- source/libs/qworker/src/qworker.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index fe2acb805b..e4c3c95b9c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -327,7 +327,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } if (0 == ctx->level) { - QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); + QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); break; } From 0a219ec88ff30f7eb87faa7bda28b8a14040fe6e Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 3 Dec 2022 17:38:38 +0800 Subject: [PATCH 11/12] fix(case): taos shell modify Query OK output string --- tests/system-test/0-others/taosShell.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py index e3095e8b93..bcf27800f2 100644 --- a/tests/system-test/0-others/taosShell.py +++ b/tests/system-test/0-others/taosShell.py @@ -62,7 +62,7 @@ def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key print ('taos login success! Here can run sql, taos> ') if len(sqlString) != 0: child.sendline (sqlString) - w = child.expect(["Query OK", taosExpect.TIMEOUT, taosExpect.EOF], timeout=10) + w = child.expect(["Query OK", "Create OK", "Insert OK", "Drop OK", taosExpect.TIMEOUT, taosExpect.EOF], timeout=10) if w == 0: return "TAOS_OK" else: @@ -233,7 +233,7 @@ class TDTestCase: tdLog.printNoPrefix("================================ parameter: -s") newDbName="dbss" keyDict['s'] = "\"create database " + newDbName + "\"" - retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -s fail") @@ -246,17 +246,17 @@ class TDTestCase: tdLog.exit("create db fail after taos -s %s fail"%(keyDict['s'])) keyDict['s'] = "\"create table " + newDbName + ".stb (ts timestamp, c int) tags (t int)\"" - retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -s create table fail") keyDict['s'] = "\"create table " + newDbName + ".ctb0 using " + newDbName + ".stb tags (0) " + newDbName + ".ctb1 using " + newDbName + ".stb tags (1)\"" - retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -s create table fail") keyDict['s'] = "\"insert into " + newDbName + ".ctb0 values('2021-04-01 08:00:00.000', 10)('2021-04-01 08:00:01.000', 20) " + newDbName + ".ctb1 values('2021-04-01 08:00:00.000', 11)('2021-04-01 08:00:01.000', 21)\"" - retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -s insert data fail") @@ -401,27 +401,27 @@ class TDTestCase: tdLog.printNoPrefix("================================ parameter: -w") newDbName="dbw" keyDict['s'] = "\"create database " + newDbName + "\"" - retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -w fail") keyDict['s'] = "\"create table " + newDbName + ".ntb (ts timestamp, c binary(128))\"" - retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -w create table fail") keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.001', 'abcd0123456789')('2021-04-01 08:00:00.002', 'abcd012345678901234567890123456789') \"" - retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -w insert data fail") keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.003', 'aaaaaaaaaaaaaaaaaaaa')('2021-04-01 08:00:01.004', 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') \"" - retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -w insert data fail") keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.005', 'cccccccccccccccccccc')('2021-04-01 08:00:01.006', 'dddddddddddddddddddddddddddddddddddddddd') \"" - retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '') if retCode != "TAOS_OK": tdLog.exit("taos -w insert data fail") From a976d3a4c10ed6fdad29976cdb8c9b713764af49 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 3 Dec 2022 17:48:47 +0800 Subject: [PATCH 12/12] fix(case): pexpect module expect return index --- tests/system-test/0-others/taosShell.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py index bcf27800f2..5c7bb0443a 100644 --- a/tests/system-test/0-others/taosShell.py +++ b/tests/system-test/0-others/taosShell.py @@ -63,7 +63,7 @@ def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key if len(sqlString) != 0: child.sendline (sqlString) w = child.expect(["Query OK", "Create OK", "Insert OK", "Drop OK", taosExpect.TIMEOUT, taosExpect.EOF], timeout=10) - if w == 0: + if w == 0 or w == 1 or w == 2: return "TAOS_OK" else: print(1)