Merge pull request #18646 from taosdata/fix/TD-20684-3.0
feat(api): add new api int64_t taos_affected_rows64
This commit is contained in:
commit
f9f60a8f39
|
@ -185,6 +185,7 @@ DLL_EXPORT void taos_kill_query(TAOS *taos);
|
||||||
DLL_EXPORT int taos_field_count(TAOS_RES *res);
|
DLL_EXPORT int taos_field_count(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
|
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
|
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
|
||||||
|
DLL_EXPORT int64_t taos_affected_rows64(TAOS_RES *res);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
|
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
|
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
|
||||||
|
|
|
@ -1407,8 +1407,8 @@ typedef struct {
|
||||||
int8_t streamBlockType;
|
int8_t streamBlockType;
|
||||||
int32_t compLen;
|
int32_t compLen;
|
||||||
int32_t numOfBlocks;
|
int32_t numOfBlocks;
|
||||||
int32_t numOfRows;
|
int64_t numOfRows; // from int32_t change to int64_t
|
||||||
int32_t numOfCols;
|
int64_t numOfCols;
|
||||||
int64_t skey;
|
int64_t skey;
|
||||||
int64_t ekey;
|
int64_t ekey;
|
||||||
int64_t version; // for stream
|
int64_t version; // for stream
|
||||||
|
|
|
@ -68,7 +68,7 @@ typedef struct SInputData {
|
||||||
|
|
||||||
typedef struct SOutputData {
|
typedef struct SOutputData {
|
||||||
int32_t numOfBlocks;
|
int32_t numOfBlocks;
|
||||||
int32_t numOfRows;
|
int64_t numOfRows; // int32_t changed to int64_t
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
int8_t compressed;
|
int8_t compressed;
|
||||||
char* pData;
|
char* pData;
|
||||||
|
|
|
@ -171,9 +171,9 @@ typedef struct SReqResultInfo {
|
||||||
char** convertBuf;
|
char** convertBuf;
|
||||||
TAOS_ROW row;
|
TAOS_ROW row;
|
||||||
SResultColumn* pCol;
|
SResultColumn* pCol;
|
||||||
uint32_t numOfRows;
|
uint64_t numOfRows; // from int32_t change to int64_t
|
||||||
uint64_t totalRows;
|
uint64_t totalRows;
|
||||||
uint32_t current;
|
uint64_t current;
|
||||||
bool localResultFetched;
|
bool localResultFetched;
|
||||||
bool completed;
|
bool completed;
|
||||||
int32_t precision;
|
int32_t precision;
|
||||||
|
|
|
@ -317,7 +317,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
|
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
} else {
|
} else {
|
||||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
}
|
}
|
||||||
|
@ -1527,7 +1527,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
||||||
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
|
@ -1941,7 +1941,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
|
||||||
|
|
||||||
pResultInfo->pRspMsg = (const char*)pRsp;
|
pResultInfo->pRspMsg = (const char*)pRsp;
|
||||||
pResultInfo->pData = (void*)pRsp->data;
|
pResultInfo->pData = (void*)pRsp->data;
|
||||||
pResultInfo->numOfRows = htonl(pRsp->numOfRows);
|
pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
|
||||||
pResultInfo->current = 0;
|
pResultInfo->current = 0;
|
||||||
pResultInfo->completed = (pRsp->completed == 1);
|
pResultInfo->completed = (pRsp->completed == 1);
|
||||||
pResultInfo->payloadLen = htonl(pRsp->compLen);
|
pResultInfo->payloadLen = htonl(pRsp->compLen);
|
||||||
|
|
|
@ -435,11 +435,23 @@ const char *taos_data_type(int type) {
|
||||||
|
|
||||||
const char *taos_get_client_info() { return version; }
|
const char *taos_get_client_info() { return version; }
|
||||||
|
|
||||||
|
// return int32_t
|
||||||
int taos_affected_rows(TAOS_RES *res) {
|
int taos_affected_rows(TAOS_RES *res) {
|
||||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
|
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
|
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
|
||||||
|
return (int)pResInfo->numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
// return int64_t
|
||||||
|
int64_t taos_affected_rows64(TAOS_RES *res) {
|
||||||
|
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
|
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
|
||||||
return pResInfo->numOfRows;
|
return pResInfo->numOfRows;
|
||||||
|
@ -956,7 +968,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
|
||||||
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
|
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
} else {
|
} else {
|
||||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
|
|
||||||
|
|
|
@ -450,7 +450,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
|
||||||
(*pRsp)->precision = 0;
|
(*pRsp)->precision = 0;
|
||||||
(*pRsp)->compressed = 0;
|
(*pRsp)->compressed = 0;
|
||||||
(*pRsp)->compLen = 0;
|
(*pRsp)->compLen = 0;
|
||||||
(*pRsp)->numOfRows = htonl(pBlock->info.rows);
|
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||||
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
|
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
|
||||||
|
|
||||||
int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS);
|
int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS);
|
||||||
|
|
|
@ -25,7 +25,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
|
||||||
pRetrieve->precision = precision;
|
pRetrieve->precision = precision;
|
||||||
pRetrieve->compressed = 0;
|
pRetrieve->compressed = 0;
|
||||||
pRetrieve->completed = 1;
|
pRetrieve->completed = 1;
|
||||||
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||||
|
|
||||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
||||||
actualLen += sizeof(SRetrieveTableRsp);
|
actualLen += sizeof(SRetrieveTableRsp);
|
||||||
|
|
|
@ -36,7 +36,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
|
||||||
(*pRsp)->precision = 0;
|
(*pRsp)->precision = 0;
|
||||||
(*pRsp)->compressed = 0;
|
(*pRsp)->compressed = 0;
|
||||||
(*pRsp)->compLen = 0;
|
(*pRsp)->compLen = 0;
|
||||||
(*pRsp)->numOfRows = htonl(pBlock->info.rows);
|
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||||
(*pRsp)->numOfCols = htonl(numOfCols);
|
(*pRsp)->numOfCols = htonl(numOfCols);
|
||||||
|
|
||||||
int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols);
|
int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols);
|
||||||
|
|
|
@ -1663,7 +1663,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
rsp->completed = 1;
|
rsp->completed = 1;
|
||||||
rsp->numOfRows = htonl(rowNum);
|
rsp->numOfRows = htobe64((int64_t)rowNum);
|
||||||
|
|
||||||
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
|
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
|
||||||
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
|
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));
|
||||||
|
|
|
@ -707,7 +707,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
|
||||||
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
|
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
|
||||||
|
|
||||||
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
|
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
|
||||||
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
|
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
|
||||||
SOperatorInfo* pOperator);
|
SOperatorInfo* pOperator);
|
||||||
|
|
||||||
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
|
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
|
||||||
|
|
|
@ -115,14 +115,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||||
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
" execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
||||||
", total:%.2f Kb, try next %d/%" PRIzu,
|
", total:%.2f Kb, try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
|
||||||
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
|
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
|
||||||
totalSources);
|
totalSources);
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||||
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
|
" execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
|
||||||
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
|
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
|
||||||
}
|
}
|
||||||
|
@ -367,14 +367,14 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
pSourceDataInfo->pRsp = pMsg->pData;
|
pSourceDataInfo->pRsp = pMsg->pData;
|
||||||
|
|
||||||
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
|
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
|
||||||
pRsp->numOfRows = htonl(pRsp->numOfRows);
|
pRsp->numOfRows = htobe64(pRsp->numOfRows);
|
||||||
pRsp->compLen = htonl(pRsp->compLen);
|
pRsp->compLen = htonl(pRsp->compLen);
|
||||||
pRsp->numOfCols = htonl(pRsp->numOfCols);
|
pRsp->numOfCols = htonl(pRsp->numOfCols);
|
||||||
pRsp->useconds = htobe64(pRsp->useconds);
|
pRsp->useconds = htobe64(pRsp->useconds);
|
||||||
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
|
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
|
||||||
|
|
||||||
ASSERT(pRsp != NULL);
|
ASSERT(pRsp != NULL);
|
||||||
qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
|
qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
|
||||||
pRsp->numOfRows, pExchangeInfo);
|
pRsp->numOfRows, pExchangeInfo);
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
|
@ -472,7 +472,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
|
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
|
||||||
SOperatorInfo* pOperator) {
|
SOperatorInfo* pOperator) {
|
||||||
pInfo->totalRows += numOfRows;
|
pInfo->totalRows += numOfRows;
|
||||||
pInfo->totalSize += dataLen;
|
pInfo->totalSize += dataLen;
|
||||||
|
@ -652,7 +652,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
|
||||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
||||||
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
|
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
|
||||||
|
@ -661,7 +661,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
pExchangeInfo->current += 1;
|
pExchangeInfo->current += 1;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", totalRows:%" PRIu64
|
||||||
", totalBytes:%" PRIu64,
|
", totalBytes:%" PRIu64,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
|
||||||
pLoadInfo->totalRows, pLoadInfo->totalSize);
|
pLoadInfo->totalRows, pLoadInfo->totalSize);
|
||||||
|
|
|
@ -37,7 +37,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
|
||||||
rsp->precision = input->precision;
|
rsp->precision = input->precision;
|
||||||
rsp->compressed = input->compressed;
|
rsp->compressed = input->compressed;
|
||||||
rsp->compLen = htonl(len);
|
rsp->compLen = htonl(len);
|
||||||
rsp->numOfRows = htonl(input->numOfRows);
|
rsp->numOfRows = htobe64(input->numOfRows);
|
||||||
rsp->numOfCols = htonl(input->numOfCols);
|
rsp->numOfCols = htonl(input->numOfCols);
|
||||||
rsp->numOfBlocks = htonl(input->numOfBlocks);
|
rsp->numOfBlocks = htonl(input->numOfBlocks);
|
||||||
}
|
}
|
||||||
|
|
|
@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
||||||
QW_ERR_RET(code);
|
QW_ERR_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks,
|
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks,
|
||||||
pOutput->numOfRows);
|
pOutput->numOfRows);
|
||||||
|
|
||||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
|
||||||
|
@ -320,19 +320,19 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
||||||
pOutput->numOfBlocks++;
|
pOutput->numOfBlocks++;
|
||||||
|
|
||||||
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
|
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
|
||||||
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks,
|
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
|
||||||
pOutput->numOfRows);
|
pOutput->numOfRows);
|
||||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 == ctx->level) {
|
if (0 == ctx->level) {
|
||||||
QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
|
QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
|
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
|
||||||
QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
|
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -297,7 +297,7 @@ typedef struct SSchJob {
|
||||||
SExecResult execRes;
|
SExecResult execRes;
|
||||||
void *fetchRes; // TODO free it or not
|
void *fetchRes; // TODO free it or not
|
||||||
bool fetched;
|
bool fetched;
|
||||||
int32_t resNumOfRows;
|
int64_t resNumOfRows; // from int32_t to int64_t
|
||||||
SSchResInfo userRes;
|
SSchResInfo userRes;
|
||||||
char *sql;
|
char *sql;
|
||||||
SQueryProfileSummary summary;
|
SQueryProfileSummary summary;
|
||||||
|
|
|
@ -412,7 +412,7 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
|
||||||
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
|
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows);
|
SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
@ -526,9 +526,9 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
|
||||||
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
|
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
|
||||||
|
|
||||||
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
|
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
|
||||||
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
|
SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
|
||||||
|
|
||||||
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
|
atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
|
||||||
atomic_store_ptr(&pJob->fetchRes, pRsp);
|
atomic_store_ptr(&pJob->fetchRes, pRsp);
|
||||||
|
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
||||||
|
|
|
@ -108,13 +108,13 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_ptr(&pJob->fetchRes, rsp);
|
atomic_store_ptr(&pJob->fetchRes, rsp);
|
||||||
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
|
atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
|
||||||
|
|
||||||
if (rsp->completed) {
|
if (rsp->completed) {
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
|
SCH_TASK_DLOG("got fetch rsp, rows:%" PRId64 ", complete:%d", htobe64(rsp->numOfRows), rsp->completed);
|
||||||
|
|
||||||
msg = NULL;
|
msg = NULL;
|
||||||
schProcessOnDataFetched(pJob);
|
schProcessOnDataFetched(pJob);
|
||||||
|
@ -279,7 +279,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
|
atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
|
||||||
SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks);
|
SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks);
|
||||||
|
|
||||||
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||||
|
@ -317,7 +317,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
||||||
tDecodeSVDeleteRsp(&coder, &rsp);
|
tDecodeSVDeleteRsp(&coder, &rsp);
|
||||||
tDecoderClear(&coder);
|
tDecoderClear(&coder);
|
||||||
|
|
||||||
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows);
|
atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
|
||||||
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
|
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,7 +344,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
||||||
|
|
||||||
SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
|
SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
|
||||||
|
|
||||||
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows);
|
atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
|
||||||
|
|
||||||
taosMemoryFreeClear(msg);
|
taosMemoryFreeClear(msg);
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
pRetrieve->compressed = 0;
|
pRetrieve->compressed = 0;
|
||||||
pRetrieve->completed = 1;
|
pRetrieve->completed = 1;
|
||||||
pRetrieve->streamBlockType = pBlock->info.type;
|
pRetrieve->streamBlockType = pBlock->info.type;
|
||||||
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||||
pRetrieve->numOfCols = htonl(numOfCols);
|
pRetrieve->numOfCols = htonl(numOfCols);
|
||||||
pRetrieve->skey = htobe64(pBlock->info.window.skey);
|
pRetrieve->skey = htobe64(pBlock->info.window.skey);
|
||||||
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
||||||
|
@ -189,7 +189,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
|
||||||
pRetrieve->compressed = 0;
|
pRetrieve->compressed = 0;
|
||||||
pRetrieve->completed = 1;
|
pRetrieve->completed = 1;
|
||||||
pRetrieve->streamBlockType = pBlock->info.type;
|
pRetrieve->streamBlockType = pBlock->info.type;
|
||||||
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||||
pRetrieve->skey = htobe64(pBlock->info.window.skey);
|
pRetrieve->skey = htobe64(pBlock->info.window.skey);
|
||||||
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
||||||
pRetrieve->version = htobe64(pBlock->info.version);
|
pRetrieve->version = htobe64(pBlock->info.version);
|
||||||
|
|
|
@ -62,8 +62,8 @@ def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key
|
||||||
print ('taos login success! Here can run sql, taos> ')
|
print ('taos login success! Here can run sql, taos> ')
|
||||||
if len(sqlString) != 0:
|
if len(sqlString) != 0:
|
||||||
child.sendline (sqlString)
|
child.sendline (sqlString)
|
||||||
w = child.expect(["Query OK", taosExpect.TIMEOUT, taosExpect.EOF], timeout=10)
|
w = child.expect(["Query OK", "Create OK", "Insert OK", "Drop OK", taosExpect.TIMEOUT, taosExpect.EOF], timeout=10)
|
||||||
if w == 0:
|
if w == 0 or w == 1 or w == 2:
|
||||||
return "TAOS_OK"
|
return "TAOS_OK"
|
||||||
else:
|
else:
|
||||||
print(1)
|
print(1)
|
||||||
|
@ -233,7 +233,7 @@ class TDTestCase:
|
||||||
tdLog.printNoPrefix("================================ parameter: -s")
|
tdLog.printNoPrefix("================================ parameter: -s")
|
||||||
newDbName="dbss"
|
newDbName="dbss"
|
||||||
keyDict['s'] = "\"create database " + newDbName + "\""
|
keyDict['s'] = "\"create database " + newDbName + "\""
|
||||||
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -s fail")
|
tdLog.exit("taos -s fail")
|
||||||
|
|
||||||
|
@ -246,17 +246,17 @@ class TDTestCase:
|
||||||
tdLog.exit("create db fail after taos -s %s fail"%(keyDict['s']))
|
tdLog.exit("create db fail after taos -s %s fail"%(keyDict['s']))
|
||||||
|
|
||||||
keyDict['s'] = "\"create table " + newDbName + ".stb (ts timestamp, c int) tags (t int)\""
|
keyDict['s'] = "\"create table " + newDbName + ".stb (ts timestamp, c int) tags (t int)\""
|
||||||
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -s create table fail")
|
tdLog.exit("taos -s create table fail")
|
||||||
|
|
||||||
keyDict['s'] = "\"create table " + newDbName + ".ctb0 using " + newDbName + ".stb tags (0) " + newDbName + ".ctb1 using " + newDbName + ".stb tags (1)\""
|
keyDict['s'] = "\"create table " + newDbName + ".ctb0 using " + newDbName + ".stb tags (0) " + newDbName + ".ctb1 using " + newDbName + ".stb tags (1)\""
|
||||||
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -s create table fail")
|
tdLog.exit("taos -s create table fail")
|
||||||
|
|
||||||
keyDict['s'] = "\"insert into " + newDbName + ".ctb0 values('2021-04-01 08:00:00.000', 10)('2021-04-01 08:00:01.000', 20) " + newDbName + ".ctb1 values('2021-04-01 08:00:00.000', 11)('2021-04-01 08:00:01.000', 21)\""
|
keyDict['s'] = "\"insert into " + newDbName + ".ctb0 values('2021-04-01 08:00:00.000', 10)('2021-04-01 08:00:01.000', 20) " + newDbName + ".ctb1 values('2021-04-01 08:00:00.000', 11)('2021-04-01 08:00:01.000', 21)\""
|
||||||
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '')
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -s insert data fail")
|
tdLog.exit("taos -s insert data fail")
|
||||||
|
|
||||||
|
@ -401,27 +401,27 @@ class TDTestCase:
|
||||||
tdLog.printNoPrefix("================================ parameter: -w")
|
tdLog.printNoPrefix("================================ parameter: -w")
|
||||||
newDbName="dbw"
|
newDbName="dbw"
|
||||||
keyDict['s'] = "\"create database " + newDbName + "\""
|
keyDict['s'] = "\"create database " + newDbName + "\""
|
||||||
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -w fail")
|
tdLog.exit("taos -w fail")
|
||||||
|
|
||||||
keyDict['s'] = "\"create table " + newDbName + ".ntb (ts timestamp, c binary(128))\""
|
keyDict['s'] = "\"create table " + newDbName + ".ntb (ts timestamp, c binary(128))\""
|
||||||
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -w create table fail")
|
tdLog.exit("taos -w create table fail")
|
||||||
|
|
||||||
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.001', 'abcd0123456789')('2021-04-01 08:00:00.002', 'abcd012345678901234567890123456789') \""
|
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.001', 'abcd0123456789')('2021-04-01 08:00:00.002', 'abcd012345678901234567890123456789') \""
|
||||||
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '')
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -w insert data fail")
|
tdLog.exit("taos -w insert data fail")
|
||||||
|
|
||||||
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.003', 'aaaaaaaaaaaaaaaaaaaa')('2021-04-01 08:00:01.004', 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') \""
|
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.003', 'aaaaaaaaaaaaaaaaaaaa')('2021-04-01 08:00:01.004', 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') \""
|
||||||
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '')
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -w insert data fail")
|
tdLog.exit("taos -w insert data fail")
|
||||||
|
|
||||||
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.005', 'cccccccccccccccccccc')('2021-04-01 08:00:01.006', 'dddddddddddddddddddddddddddddddddddddddd') \""
|
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.005', 'cccccccccccccccccccc')('2021-04-01 08:00:01.006', 'dddddddddddddddddddddddddddddddddddddddd') \""
|
||||||
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '')
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -w insert data fail")
|
tdLog.exit("taos -w insert data fail")
|
||||||
|
|
||||||
|
|
|
@ -214,6 +214,18 @@ void shellRunSingleCommandImp(char *command) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pre string
|
||||||
|
char * pre = "Query OK";
|
||||||
|
if (shellRegexMatch(command, "^\\s*delete\\s*from\\s*.*", REG_EXTENDED | REG_ICASE)) {
|
||||||
|
pre = "Delete OK";
|
||||||
|
} else if(shellRegexMatch(command, "^\\s*insert\\s*into\\s*.*", REG_EXTENDED | REG_ICASE)) {
|
||||||
|
pre = "Insert OK";
|
||||||
|
} else if(shellRegexMatch(command, "^\\s*create\\s*.*", REG_EXTENDED | REG_ICASE)) {
|
||||||
|
pre = "Create OK";
|
||||||
|
} else if(shellRegexMatch(command, "^\\s*drop\\s*.*", REG_EXTENDED | REG_ICASE)) {
|
||||||
|
pre = "Drop OK";
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_FIELD *pFields = taos_fetch_fields(pSql);
|
TAOS_FIELD *pFields = taos_fetch_fields(pSql);
|
||||||
if (pFields != NULL) { // select and show kinds of commands
|
if (pFields != NULL) { // select and show kinds of commands
|
||||||
int32_t error_no = 0;
|
int32_t error_no = 0;
|
||||||
|
@ -229,10 +241,10 @@ void shellRunSingleCommandImp(char *command) {
|
||||||
}
|
}
|
||||||
taos_free_result(pSql);
|
taos_free_result(pSql);
|
||||||
} else {
|
} else {
|
||||||
int32_t num_rows_affacted = taos_affected_rows(pSql);
|
int64_t num_rows_affacted = taos_affected_rows64(pSql);
|
||||||
taos_free_result(pSql);
|
taos_free_result(pSql);
|
||||||
et = taosGetTimestampUs();
|
et = taosGetTimestampUs();
|
||||||
printf("Query OK, %d row(s) affected (%.6fs)\r\n", num_rows_affacted, (et - st) / 1E6);
|
printf("%s, %" PRId64 " row(s) affected (%.6fs)\r\n", pre, num_rows_affacted, (et - st) / 1E6);
|
||||||
|
|
||||||
// call auto tab
|
// call auto tab
|
||||||
callbackAutoTab(command, NULL, false);
|
callbackAutoTab(command, NULL, false);
|
||||||
|
|
Loading…
Reference in New Issue