Merge branch '3.0' into fix/TD-20904-3.0

This commit is contained in:
kailixu 2022-12-04 17:50:35 +08:00
commit 0d68190378
28 changed files with 290 additions and 175 deletions

View File

@ -185,6 +185,7 @@ DLL_EXPORT void taos_kill_query(TAOS *taos);
DLL_EXPORT int taos_field_count(TAOS_RES *res); DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res); DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT int64_t taos_affected_rows64(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);

View File

@ -1407,8 +1407,8 @@ typedef struct {
int8_t streamBlockType; int8_t streamBlockType;
int32_t compLen; int32_t compLen;
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t numOfRows; int64_t numOfRows; // from int32_t change to int64_t
int32_t numOfCols; int64_t numOfCols;
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
int64_t version; // for stream int64_t version; // for stream

View File

@ -68,7 +68,7 @@ typedef struct SInputData {
typedef struct SOutputData { typedef struct SOutputData {
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t numOfRows; int64_t numOfRows; // int32_t changed to int64_t
int32_t numOfCols; int32_t numOfCols;
int8_t compressed; int8_t compressed;
char* pData; char* pData;

View File

@ -129,6 +129,7 @@ typedef struct SDBVgInfo {
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs; int64_t stateTs;
SHashObj* vgHash; // key:vgId, value:SVgroupInfo SHashObj* vgHash; // key:vgId, value:SVgroupInfo
SArray* vgArray;
} SDBVgInfo; } SDBVgInfo;
typedef struct SUseDbOutput { typedef struct SUseDbOutput {
@ -238,6 +239,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
char* parseTagDatatoJson(void* p); char* parseTagDatatoJson(void* p);
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst); int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst); int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
void freeVgInfo(SDBVgInfo* vgInfo);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen, extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
void* (*mallocFp)(int64_t)); void* (*mallocFp)(int64_t));

View File

@ -171,9 +171,9 @@ typedef struct SReqResultInfo {
char** convertBuf; char** convertBuf;
TAOS_ROW row; TAOS_ROW row;
SResultColumn* pCol; SResultColumn* pCol;
uint32_t numOfRows; uint64_t numOfRows; // from int32_t change to int64_t
uint64_t totalRows; uint64_t totalRows;
uint32_t current; uint64_t current;
bool localResultFetched; bool localResultFetched;
bool completed; bool completed;
int32_t precision; int32_t precision;

View File

@ -317,7 +317,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId); pRequest->requestId);
} else { } else {
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
pRequest->requestId); pRequest->requestId);
} }
@ -1527,7 +1527,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
return NULL; return NULL;
} }
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
@ -1941,7 +1941,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data; pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows); pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
pResultInfo->current = 0; pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1); pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->payloadLen = htonl(pRsp->compLen); pResultInfo->payloadLen = htonl(pRsp->compLen);

View File

@ -435,11 +435,23 @@ const char *taos_data_type(int type) {
const char *taos_get_client_info() { return version; } const char *taos_get_client_info() { return version; }
// return int32_t
int taos_affected_rows(TAOS_RES *res) { int taos_affected_rows(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
return 0; return 0;
} }
SRequestObj *pRequest = (SRequestObj *)res;
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
return (int)pResInfo->numOfRows;
}
// return int64_t
int64_t taos_affected_rows64(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
return 0;
}
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
SReqResultInfo *pResInfo = &pRequest->body.resInfo; SReqResultInfo *pResInfo = &pRequest->body.resInfo;
return pResInfo->numOfRows; return pResInfo->numOfRows;
@ -956,7 +968,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId); pRequest->requestId);
} else { } else {
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
pRequest->requestId); pRequest->requestId);

View File

@ -450,7 +450,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->precision = 0; (*pRsp)->precision = 0;
(*pRsp)->compressed = 0; (*pRsp)->compressed = 0;
(*pRsp)->compLen = 0; (*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htonl(pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS);

View File

@ -25,7 +25,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->precision = precision; pRetrieve->precision = precision;
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);

View File

@ -762,7 +762,6 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput)
int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList); int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList);
void ctgFreeJob(void* job); void ctgFreeJob(void* job);
void ctgFreeHandleImpl(SCatalog* pCtg); void ctgFreeHandleImpl(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo* vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup); int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup);
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx, int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
char* dbFName, SArray* pNames, bool update); char* dbFName, SArray* pNames, bool update);
@ -789,6 +788,8 @@ int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
int32_t ctgdGetOneHandle(SCatalog **pHandle); int32_t ctgdGetOneHandle(SCatalog **pHandle);
int ctgVgInfoComp(const void* lp, const void* rp);
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
extern SCatalogMgmt gCtgMgmt; extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug; extern SCtgDebug gCTGDebug;

View File

@ -505,8 +505,7 @@ _return:
taosMemoryFreeClear(tbMeta); taosMemoryFreeClear(tbMeta);
if (vgInfo) { if (vgInfo) {
taosHashCleanup(vgInfo->vgHash); freeVgInfo(vgInfo);
taosMemoryFreeClear(vgInfo);
} }
if (vgList) { if (vgList) {
@ -546,8 +545,7 @@ _return:
} }
if (vgInfo) { if (vgInfo) {
taosHashCleanup(vgInfo->vgHash); freeVgInfo(vgInfo);
taosMemoryFreeClear(vgInfo);
} }
CTG_RET(code); CTG_RET(code);
@ -778,8 +776,7 @@ _return:
} }
if (vgInfo) { if (vgInfo) {
taosHashCleanup(vgInfo->vgHash); freeVgInfo(vgInfo);
taosMemoryFreeClear(vgInfo);
} }
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
@ -836,8 +833,7 @@ _return:
ctgRUnlockVgInfo(dbCache); ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
} else if (dbInfo) { } else if (dbInfo) {
taosHashCleanup(dbInfo->vgHash); freeVgInfo(dbInfo);
taosMemoryFreeClear(dbInfo);
} }
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
@ -849,7 +845,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId
int32_t code = 0; int32_t code = 0;
if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) { if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) {
ctgFreeVgInfo(dbInfo); freeVgInfo(dbInfo);
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
} }

View File

@ -226,6 +226,7 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/*
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) { int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache *pCache = NULL; SCtgTbCache *pCache = NULL;
@ -276,6 +277,49 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
*/
int32_t ctgAcquireStbMetaFromCache(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgTbCache **pTb) {
SCtgTbCache *pCache = NULL;
char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
goto _return;
}
pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
if (NULL == pCache) {
ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
taosHashRelease(dbCache->stbCache, stName);
goto _return;
}
taosHashRelease(dbCache->stbCache, stName);
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
goto _return;
}
*pTb = pCache;
ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
CTG_CACHE_STAT_INC(numOfMetaHit, 1);
return TSDB_CODE_SUCCESS;
_return:
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
*pTb = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) { int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
@ -384,13 +428,19 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
memcpy(*pTableMeta, tbMeta, metaSize); memcpy(*pTableMeta, tbMeta, metaSize);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
if (tbCache) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
taosHashRelease(dbCache->tbCache, tbCache);
}
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname, ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
ctx->tbInfo.tbType, dbFName); ctx->tbInfo.tbType, dbFName);
ctgAcquireStbMetaFromCache(pCtg, dbFName, ctx->tbInfo.suid, &dbCache, &tbCache); ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
if (NULL == tbCache) { if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
taosMemoryFreeClear(*pTableMeta); taosMemoryFreeClear(*pTableMeta);
ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid); ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -460,12 +510,17 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
// PROCESS FOR CHILD TABLE // PROCESS FOR CHILD TABLE
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
if (tbCache) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
taosHashRelease(dbCache->tbCache, tbCache);
}
ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName); ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
ctgAcquireStbMetaFromCache(pCtg, dbFName, *suid, &dbCache, &tbCache); ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, *suid, &tbCache);
if (NULL == tbCache) { if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid); ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -794,7 +849,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
taosMemoryFree(op); taosMemoryFree(op);
ctgFreeVgInfo(dbInfo); freeVgInfo(dbInfo);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
@ -803,6 +858,14 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
dbFName = p + 1; dbFName = p + 1;
} }
code = ctgMakeVgArray(dbInfo);
if (code) {
taosMemoryFree(op);
taosMemoryFree(msg);
freeVgInfo(dbInfo);
CTG_ERR_RET(code);
}
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->pCtg = pCtg; msg->pCtg = pCtg;
msg->dbId = dbId; msg->dbId = dbId;
@ -816,7 +879,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
_return: _return:
ctgFreeVgInfo(dbInfo); freeVgInfo(dbInfo);
CTG_RET(code); CTG_RET(code);
} }
@ -1549,6 +1612,20 @@ void ctgFreeAllInstance(void) {
taosHashClear(gCtgMgmt.pCluster); taosHashClear(gCtgMgmt.pCluster);
} }
int32_t ctgVgInfoIdComp(void const* lp, void const* rp) {
int32_t* key = (int32_t*)lp;
SVgroupInfo* pVg = (SVgroupInfo*)rp;
if (*key < pVg->vgId) {
return -1;
} else if (*key > pVg->vgId) {
return 1;
}
return 0;
}
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
int32_t code = 0; int32_t code = 0;
SCtgUpdateVgMsg *msg = operation->data; SCtgUpdateVgMsg *msg = operation->data;
@ -1600,7 +1677,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
goto _return; goto _return;
} }
ctgFreeVgInfo(vgInfo); freeVgInfo(vgInfo);
} }
vgCache->vgInfo = dbInfo; vgCache->vgInfo = dbInfo;
@ -1621,7 +1698,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
_return: _return:
ctgFreeVgInfo(msg->dbInfo); freeVgInfo(msg->dbInfo);
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
CTG_RET(code); CTG_RET(code);
@ -1674,7 +1751,7 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache)); CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
ctgFreeVgInfo(dbCache->vgCache.vgInfo); freeVgInfo(dbCache->vgCache.vgInfo);
dbCache->vgCache.vgInfo = NULL; dbCache->vgCache.vgInfo = NULL;
ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName); ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);
@ -1930,7 +2007,13 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId)); SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
if (NULL == pInfo) { if (NULL == pInfo) {
ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName); ctgDebug("no vgroup %d in db %s vgHash, ignore epset update", msg->vgId, msg->dbFName);
goto _return;
}
SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdComp, TD_EQ);
if (NULL == pInfo2) {
ctgDebug("no vgroup %d in db %s vgArray, ignore epset update", msg->vgId, msg->dbFName);
goto _return; goto _return;
} }
@ -1941,6 +2024,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName); msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
pInfo->epSet = msg->epSet; pInfo->epSet = msg->epSet;
pInfo2->epSet = msg->epSet;
_return: _return:
@ -2057,7 +2141,7 @@ void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
switch (op->opId) { switch (op->opId) {
case CTG_OP_UPDATE_VGROUP: { case CTG_OP_UPDATE_VGROUP: {
SCtgUpdateVgMsg *msg = op->data; SCtgUpdateVgMsg *msg = op->data;
ctgFreeVgInfo(msg->dbInfo); freeVgInfo(msg->dbInfo);
taosMemoryFreeClear(op->data); taosMemoryFreeClear(op->data);
break; break;
} }

View File

@ -231,20 +231,7 @@ void ctgFreeTbCache(SCtgDBCache* dbCache) {
CTG_CACHE_STAT_DEC(numOfTbl, tblNum); CTG_CACHE_STAT_DEC(numOfTbl, tblNum);
} }
void ctgFreeVgInfo(SDBVgInfo* vgInfo) { void ctgFreeVgInfoCache(SCtgDBCache* dbCache) { freeVgInfo(dbCache->vgCache.vgInfo); }
if (NULL == vgInfo) {
return;
}
if (vgInfo->vgHash) {
taosHashCleanup(vgInfo->vgHash);
vgInfo->vgHash = NULL;
}
taosMemoryFreeClear(vgInfo);
}
void ctgFreeVgInfoCache(SCtgDBCache* dbCache) { ctgFreeVgInfo(dbCache->vgCache.vgInfo); }
void ctgFreeDbCache(SCtgDBCache* dbCache) { void ctgFreeDbCache(SCtgDBCache* dbCache) {
if (NULL == dbCache) { if (NULL == dbCache) {
@ -366,8 +353,7 @@ void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
} }
if (pOutput->dbVgroup) { if (pOutput->dbVgroup) {
taosHashCleanup(pOutput->dbVgroup->vgHash); freeVgInfo(pOutput->dbVgroup);
taosMemoryFreeClear(pOutput->dbVgroup);
} }
taosMemoryFree(pOutput); taosMemoryFree(pOutput);
@ -573,8 +559,7 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void** pRes) {
case CTG_TASK_GET_DB_VGROUP: { case CTG_TASK_GET_DB_VGROUP: {
if (*pRes) { if (*pRes) {
SDBVgInfo* pInfo = (SDBVgInfo*)*pRes; SDBVgInfo* pInfo = (SDBVgInfo*)*pRes;
taosHashCleanup(pInfo->vgHash); freeVgInfo(pInfo);
taosMemoryFreeClear(*pRes);
} }
break; break;
} }
@ -837,10 +822,36 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int ctgVgInfoComp(const void* lp, const void* rp) {
SVgroupInfo* pLeft = (SVgroupInfo*)lp;
SVgroupInfo* pRight = (SVgroupInfo*)rp;
if (pLeft->hashBegin < pRight->hashBegin) {
return -1;
} else if (pLeft->hashBegin > pRight->hashBegin) {
return 1;
}
return 0;
}
int32_t ctgHashValueComp(void const* lp, void const* rp) {
uint32_t* key = (uint32_t*)lp;
SVgroupInfo* pVg = (SVgroupInfo*)rp;
if (*key < pVg->hashBegin) {
return -1;
} else if (*key > pVg->hashEnd) {
return 1;
}
return 0;
}
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup) { int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup) {
int32_t code = 0; int32_t code = 0;
CTG_ERR_RET(ctgMakeVgArray(dbInfo));
int32_t vgNum = taosHashGetSize(dbInfo->vgHash); int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
char db[TSDB_DB_FNAME_LEN] = {0}; char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db); tNameGetFullDbName(pTableName, db);
@ -856,6 +867,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod, uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
dbInfo->hashPrefix, dbInfo->hashSuffix); dbInfo->hashPrefix, dbInfo->hashSuffix);
vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);
/*
void* pIter = taosHashIterate(dbInfo->vgHash, NULL); void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) { while (pIter) {
vgInfo = pIter; vgInfo = pIter;
@ -867,10 +881,11 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
pIter = taosHashIterate(dbInfo->vgHash, pIter); pIter = taosHashIterate(dbInfo->vgHash, pIter);
vgInfo = NULL; vgInfo = NULL;
} }
*/
if (NULL == vgInfo) { if (NULL == vgInfo) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db,
taosHashGetSize(dbInfo->vgHash)); (int32_t)taosArrayGetSize(dbInfo->vgArray));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
@ -883,37 +898,15 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHashValueComp(void const* lp, void const* rp) {
uint32_t* key = (uint32_t*)lp;
SVgroupInfo* pVg = *(SVgroupInfo**)rp;
if (*key < pVg->hashBegin) {
return -1;
} else if (*key > pVg->hashEnd) {
return 1;
}
return 0;
}
int ctgVgInfoComp(const void* lp, const void* rp) {
SVgroupInfo* pLeft = *(SVgroupInfo**)lp;
SVgroupInfo* pRight = *(SVgroupInfo**)rp;
if (pLeft->hashBegin < pRight->hashBegin) {
return -1;
} else if (pLeft->hashBegin > pRight->hashBegin) {
return 1;
}
return 0;
}
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx, int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
char* dbFName, SArray* pNames, bool update) { char* dbFName, SArray* pNames, bool update) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SMetaRes res = {0}; SMetaRes res = {0};
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
CTG_ERR_RET(ctgMakeVgArray(dbInfo));
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
if (vgNum <= 0) { if (vgNum <= 0) {
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum); ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
@ -923,20 +916,13 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
int32_t tbNum = taosArrayGetSize(pNames); int32_t tbNum = taosArrayGetSize(pNames);
if (1 == vgNum) { if (1 == vgNum) {
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
if (NULL == pIter) {
ctgError("empty vgHash, db:%s, vgroup number:%d", dbFName, vgNum);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo)); vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == vgInfo) { if (NULL == vgInfo) {
taosHashCancelIterate(dbInfo->vgHash, pIter);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
*vgInfo = *(SVgroupInfo*)pIter; *vgInfo = *(SVgroupInfo*)taosArrayGet(dbInfo->vgArray, 0);
ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps, ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps,
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port); vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
@ -951,19 +937,9 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
} }
} }
taosHashCancelIterate(dbInfo->vgHash, pIter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray* pVgList = taosArrayInit(vgNum, POINTER_BYTES);
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
taosArrayPush(pVgList, &pIter);
pIter = taosHashIterate(dbInfo->vgHash, pIter);
}
taosArraySort(pVgList, ctgVgInfoComp);
char tbFullName[TSDB_TABLE_FNAME_LEN]; char tbFullName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFullName, "%s.", dbFName); sprintf(tbFullName, "%s.", dbFName);
int32_t offset = strlen(tbFullName); int32_t offset = strlen(tbFullName);
@ -979,20 +955,15 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod, uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
dbInfo->hashPrefix, dbInfo->hashSuffix); dbInfo->hashPrefix, dbInfo->hashSuffix);
SVgroupInfo** p = taosArraySearch(pVgList, &hashValue, ctgHashValueComp, TD_EQ); vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);
if (NULL == vgInfo) {
if (NULL == p) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
taosHashGetSize(dbInfo->vgHash)); (int32_t)taosArrayGetSize(dbInfo->vgArray));
taosArrayDestroy(pVgList);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
vgInfo = *p;
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo)); SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == pNewVg) { if (NULL == pNewVg) {
taosArrayDestroy(pVgList);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
@ -1012,8 +983,6 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
} }
} }
taosArrayDestroy(pVgList);
CTG_RET(code); CTG_RET(code);
} }
@ -1057,7 +1026,33 @@ int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) {
} }
} }
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo) {
if (NULL == dbInfo) {
return TSDB_CODE_SUCCESS;
}
if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
dbInfo->vgArray = taosArrayInit(100, sizeof(SVgroupInfo));
if (NULL == dbInfo->vgArray) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
taosArrayPush(dbInfo->vgArray, pIter);
pIter = taosHashIterate(dbInfo->vgHash, pIter);
}
taosArraySort(dbInfo->vgArray, ctgVgInfoComp);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgCloneVgInfo(SDBVgInfo* src, SDBVgInfo** dst) { int32_t ctgCloneVgInfo(SDBVgInfo* src, SDBVgInfo** dst) {
CTG_ERR_RET(ctgMakeVgArray(src));
*dst = taosMemoryMalloc(sizeof(SDBVgInfo)); *dst = taosMemoryMalloc(sizeof(SDBVgInfo));
if (NULL == *dst) { if (NULL == *dst) {
qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo)); qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
@ -1090,6 +1085,10 @@ int32_t ctgCloneVgInfo(SDBVgInfo* src, SDBVgInfo** dst) {
pIter = taosHashIterate(src->vgHash, pIter); pIter = taosHashIterate(src->vgHash, pIter);
} }
if (src->vgArray) {
(*dst)->vgArray = taosArrayDup(src->vgArray, NULL);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -36,7 +36,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->precision = 0; (*pRsp)->precision = 0;
(*pRsp)->compressed = 0; (*pRsp)->compressed = 0;
(*pRsp)->compLen = 0; (*pRsp)->compLen = 0;
(*pRsp)->numOfRows = htonl(pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols); (*pRsp)->numOfCols = htonl(numOfCols);
int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols); int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols);

View File

@ -1663,7 +1663,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
} }
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = htonl(rowNum); rsp->numOfRows = htobe64((int64_t)rowNum);
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock)); int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
ASSERT(len == rspSize - sizeof(SRetrieveTableRsp)); ASSERT(len == rspSize - sizeof(SRetrieveTableRsp));

View File

@ -707,7 +707,7 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
SOperatorInfo* pOperator); SOperatorInfo* pOperator);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);

View File

@ -115,14 +115,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb, try next %d/%" PRIzu, ", total:%.2f Kb, try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
totalSources); totalSources);
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
} }
@ -367,14 +367,14 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
pSourceDataInfo->pRsp = pMsg->pData; pSourceDataInfo->pRsp = pMsg->pData;
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows); pRsp->numOfRows = htobe64(pRsp->numOfRows);
pRsp->compLen = htonl(pRsp->compLen); pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
ASSERT(pRsp != NULL); ASSERT(pRsp != NULL);
qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
pRsp->numOfRows, pExchangeInfo); pRsp->numOfRows, pExchangeInfo);
} else { } else {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
@ -472,7 +472,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
SOperatorInfo* pOperator) { SOperatorInfo* pOperator) {
pInfo->totalRows += numOfRows; pInfo->totalRows += numOfRows;
pInfo->totalSize += dataLen; pInfo->totalSize += dataLen;
@ -652,7 +652,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
@ -661,7 +661,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1; pExchangeInfo->current += 1;
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", totalRows:%" PRIu64
", totalBytes:%" PRIu64, ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pLoadInfo->totalRows, pLoadInfo->totalSize); pLoadInfo->totalRows, pLoadInfo->totalSize);

View File

@ -447,6 +447,19 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void freeVgInfo(SDBVgInfo* vgInfo) {
if (NULL == vgInfo) {
return;
}
taosHashCleanup(vgInfo->vgHash);
taosArrayDestroy(vgInfo->vgArray);
taosMemoryFreeClear(vgInfo);
}
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
if (NULL == pSrc) { if (NULL == pSrc) {
*pDst = NULL; *pDst = NULL;
@ -475,8 +488,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) { if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
qError("taosHashPut failed, vgId:%d", vgInfo->vgId); qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
taosHashCancelIterate(pSrc->vgHash, pIter); taosHashCancelIterate(pSrc->vgHash, pIter);
taosHashCleanup((*pDst)->vgHash); freeVgInfo(*pDst);
taosMemoryFreeClear(*pDst);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }

View File

@ -37,7 +37,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
rsp->precision = input->precision; rsp->precision = input->precision;
rsp->compressed = input->compressed; rsp->compressed = input->compressed;
rsp->compLen = htonl(len); rsp->compLen = htonl(len);
rsp->numOfRows = htonl(input->numOfRows); rsp->numOfRows = htobe64(input->numOfRows);
rsp->numOfCols = htonl(input->numOfCols); rsp->numOfCols = htonl(input->numOfCols);
rsp->numOfBlocks = htonl(input->numOfBlocks); rsp->numOfBlocks = htonl(input->numOfBlocks);
} }

View File

@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks, QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks,
pOutput->numOfRows); pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
@ -320,19 +320,19 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
pOutput->numOfBlocks++; pOutput->numOfBlocks++;
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks, QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows); pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
break; break;
} }
if (0 == ctx->level) { if (0 == ctx->level) {
QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
break; break;
} }
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
break; break;
} }
} }

View File

@ -297,7 +297,7 @@ typedef struct SSchJob {
SExecResult execRes; SExecResult execRes;
void *fetchRes; // TODO free it or not void *fetchRes; // TODO free it or not
bool fetched; bool fetched;
int32_t resNumOfRows; int64_t resNumOfRows; // from int32_t to int64_t
SSchResInfo userRes; SSchResInfo userRes;
char *sql; char *sql;
SQueryProfileSummary summary; SQueryProfileSummary summary;

View File

@ -412,7 +412,7 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
SCH_JOB_DLOG("empty res and set query complete, code:%x", code); SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
} }
SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows); SCH_JOB_DLOG("fetch done, totalRows:%" PRId64, pJob->resNumOfRows);
_return: _return:
@ -526,9 +526,9 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); } void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); SCH_TASK_DLOG("got explain rsp, rows:%" PRId64 ", complete:%d", htobe64(pRsp->numOfRows), pRsp->completed);
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); atomic_store_64(&pJob->resNumOfRows, htobe64(pRsp->numOfRows));
atomic_store_ptr(&pJob->fetchRes, pRsp); atomic_store_ptr(&pJob->fetchRes, pRsp);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);

View File

@ -108,13 +108,13 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs
} }
atomic_store_ptr(&pJob->fetchRes, rsp); atomic_store_ptr(&pJob->fetchRes, rsp);
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
if (rsp->completed) { if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
} }
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); SCH_TASK_DLOG("got fetch rsp, rows:%" PRId64 ", complete:%d", htobe64(rsp->numOfRows), rsp->completed);
msg = NULL; msg = NULL;
schProcessOnDataFetched(pJob); schProcessOnDataFetched(pJob);
@ -279,7 +279,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
} }
} }
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks); SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks);
SCH_LOCK(SCH_WRITE, &pJob->resLock); SCH_LOCK(SCH_WRITE, &pJob->resLock);
@ -317,7 +317,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
tDecodeSVDeleteRsp(&coder, &rsp); tDecodeSVDeleteRsp(&coder, &rsp);
tDecoderClear(&coder); tDecoderClear(&coder);
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
} }
@ -344,7 +344,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp)); SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);

View File

@ -112,7 +112,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
@ -189,7 +189,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version); pRetrieve->version = htobe64(pBlock->info.version);

View File

@ -294,8 +294,8 @@ static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value,
SDiskCfg cfg = {0}; SDiskCfg cfg = {0};
tstrncpy(cfg.dir, value, sizeof(cfg.dir)); tstrncpy(cfg.dir, value, sizeof(cfg.dir));
cfg.level = atoi(level); cfg.level = level ? atoi(level) : 0;
cfg.primary = atoi(primary); cfg.primary = primary ? atoi(primary) : 1;
void *ret = taosArrayPush(pItem->array, &cfg); void *ret = taosArrayPush(pItem->array, &cfg);
if (ret == NULL) { if (ret == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -660,13 +660,12 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
if (vlen3 != 0) value3[vlen3] = 0; if (vlen3 != 0) value3[vlen3] = 0;
} }
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) { if (strcasecmp(name, "dataDir") == 0) {
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_VAR); code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_VAR);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} else {
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
uInfo("load from env variables cfg success"); uInfo("load from env variables cfg success");
@ -703,13 +702,12 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
if (vlen3 != 0) value3[vlen3] = 0; if (vlen3 != 0) value3[vlen3] = 0;
} }
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) { if (strcasecmp(name, "dataDir") == 0) {
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_CMD); code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_CMD);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} else {
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
uInfo("load from env cmd cfg success"); uInfo("load from env cmd cfg success");
@ -768,13 +766,12 @@ int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
if (vlen3 != 0) value3[vlen3] = 0; if (vlen3 != 0) value3[vlen3] = 0;
} }
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) { if (strcasecmp(name, "dataDir") == 0) {
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_FILE); code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} else {
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
@ -828,13 +825,12 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
if (vlen3 != 0) value3[vlen3] = 0; if (vlen3 != 0) value3[vlen3] = 0;
} }
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) { if (strcasecmp(name, "dataDir") == 0) {
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE); code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} else {
code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
@ -895,7 +891,7 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
// code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE); // code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
// if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; // if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
// if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) { // if (strcasecmp(name, "dataDir") == 0) {
// code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE); // code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE);
// if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; // if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
// } // }
@ -993,13 +989,12 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
paGetToken(value2 + vlen2 + 1, &value3, &vlen3); paGetToken(value2 + vlen2 + 1, &value3, &vlen3);
if (vlen3 != 0) value3[vlen3] = 0; if (vlen3 != 0) value3[vlen3] = 0;
} }
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) { if (strcasecmp(name, "dataDir") == 0) {
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_APOLLO_URL); code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_APOLLO_URL);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} else {
code = cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
code = cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
} }
tjsonDelete(pJson); tjsonDelete(pJson);

View File

@ -62,8 +62,8 @@ def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key
print ('taos login success! Here can run sql, taos> ') print ('taos login success! Here can run sql, taos> ')
if len(sqlString) != 0: if len(sqlString) != 0:
child.sendline (sqlString) child.sendline (sqlString)
w = child.expect(["Query OK", taosExpect.TIMEOUT, taosExpect.EOF], timeout=10) w = child.expect(["Query OK", "Create OK", "Insert OK", "Drop OK", taosExpect.TIMEOUT, taosExpect.EOF], timeout=10)
if w == 0: if w == 0 or w == 1 or w == 2:
return "TAOS_OK" return "TAOS_OK"
else: else:
print(1) print(1)
@ -233,7 +233,7 @@ class TDTestCase:
tdLog.printNoPrefix("================================ parameter: -s") tdLog.printNoPrefix("================================ parameter: -s")
newDbName="dbss" newDbName="dbss"
keyDict['s'] = "\"create database " + newDbName + "\"" keyDict['s'] = "\"create database " + newDbName + "\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -s fail") tdLog.exit("taos -s fail")
@ -246,17 +246,17 @@ class TDTestCase:
tdLog.exit("create db fail after taos -s %s fail"%(keyDict['s'])) tdLog.exit("create db fail after taos -s %s fail"%(keyDict['s']))
keyDict['s'] = "\"create table " + newDbName + ".stb (ts timestamp, c int) tags (t int)\"" keyDict['s'] = "\"create table " + newDbName + ".stb (ts timestamp, c int) tags (t int)\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -s create table fail") tdLog.exit("taos -s create table fail")
keyDict['s'] = "\"create table " + newDbName + ".ctb0 using " + newDbName + ".stb tags (0) " + newDbName + ".ctb1 using " + newDbName + ".stb tags (1)\"" keyDict['s'] = "\"create table " + newDbName + ".ctb0 using " + newDbName + ".stb tags (0) " + newDbName + ".ctb1 using " + newDbName + ".stb tags (1)\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -s create table fail") tdLog.exit("taos -s create table fail")
keyDict['s'] = "\"insert into " + newDbName + ".ctb0 values('2021-04-01 08:00:00.000', 10)('2021-04-01 08:00:01.000', 20) " + newDbName + ".ctb1 values('2021-04-01 08:00:00.000', 11)('2021-04-01 08:00:01.000', 21)\"" keyDict['s'] = "\"insert into " + newDbName + ".ctb0 values('2021-04-01 08:00:00.000', 10)('2021-04-01 08:00:01.000', 20) " + newDbName + ".ctb1 values('2021-04-01 08:00:00.000', 11)('2021-04-01 08:00:01.000', 21)\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -s insert data fail") tdLog.exit("taos -s insert data fail")
@ -401,27 +401,27 @@ class TDTestCase:
tdLog.printNoPrefix("================================ parameter: -w") tdLog.printNoPrefix("================================ parameter: -w")
newDbName="dbw" newDbName="dbw"
keyDict['s'] = "\"create database " + newDbName + "\"" keyDict['s'] = "\"create database " + newDbName + "\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -w fail") tdLog.exit("taos -w fail")
keyDict['s'] = "\"create table " + newDbName + ".ntb (ts timestamp, c binary(128))\"" keyDict['s'] = "\"create table " + newDbName + ".ntb (ts timestamp, c binary(128))\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -w create table fail") tdLog.exit("taos -w create table fail")
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.001', 'abcd0123456789')('2021-04-01 08:00:00.002', 'abcd012345678901234567890123456789') \"" keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.001', 'abcd0123456789')('2021-04-01 08:00:00.002', 'abcd012345678901234567890123456789') \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -w insert data fail") tdLog.exit("taos -w insert data fail")
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.003', 'aaaaaaaaaaaaaaaaaaaa')('2021-04-01 08:00:01.004', 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') \"" keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.003', 'aaaaaaaaaaaaaaaaaaaa')('2021-04-01 08:00:01.004', 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -w insert data fail") tdLog.exit("taos -w insert data fail")
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.005', 'cccccccccccccccccccc')('2021-04-01 08:00:01.006', 'dddddddddddddddddddddddddddddddddddddddd') \"" keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.005', 'cccccccccccccccccccc')('2021-04-01 08:00:01.006', 'dddddddddddddddddddddddddddddddddddddddd') \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -w insert data fail") tdLog.exit("taos -w insert data fail")

View File

@ -147,19 +147,20 @@ class TDTestCase:
def __current_cfg(self): def __current_cfg(self):
cfg_list = [] cfg_list = []
current_case1 = [ current_case1 = [
f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {PRIMARY_DIR}", #f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE0}1 {L0} {NON_PRIMARY_DIR}", f"dataDir {self.taos_data_dir}/{DATA_PRE0}1 {L0} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE1}1 {L1} {NON_PRIMARY_DIR}", f"dataDir {self.taos_data_dir}/{DATA_PRE1}1 {L1} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE2}2 {L2} {NON_PRIMARY_DIR}" f"dataDir {self.taos_data_dir}/{DATA_PRE2}2 {L2} {NON_PRIMARY_DIR}"
] ]
current_case2 = [f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {PRIMARY_DIR}"] #current_case2 = [f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {PRIMARY_DIR}"]
current_case2 = []
for i in range(9): for i in range(9):
current_case2.append(f"dataDir {self.taos_data_dir}/{DATA_PRE0}{i+1} {L0} {NON_PRIMARY_DIR}") current_case2.append(f"dataDir {self.taos_data_dir}/{DATA_PRE0}{i+1} {L0} {NON_PRIMARY_DIR}")
# TD-17773bug # TD-17773bug
current_case3 = [ current_case3 = [
f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 ", #f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 ",
f"dataDir {self.taos_data_dir}/{DATA_PRE0}1 {L0} {NON_PRIMARY_DIR}", f"dataDir {self.taos_data_dir}/{DATA_PRE0}1 {L0} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE1}0 {L1} {NON_PRIMARY_DIR}", f"dataDir {self.taos_data_dir}/{DATA_PRE1}0 {L1} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE2}0 {L2} {NON_PRIMARY_DIR}", f"dataDir {self.taos_data_dir}/{DATA_PRE2}0 {L2} {NON_PRIMARY_DIR}",

View File

@ -214,6 +214,18 @@ void shellRunSingleCommandImp(char *command) {
return; return;
} }
// pre string
char * pre = "Query OK";
if (shellRegexMatch(command, "^\\s*delete\\s*from\\s*.*", REG_EXTENDED | REG_ICASE)) {
pre = "Delete OK";
} else if(shellRegexMatch(command, "^\\s*insert\\s*into\\s*.*", REG_EXTENDED | REG_ICASE)) {
pre = "Insert OK";
} else if(shellRegexMatch(command, "^\\s*create\\s*.*", REG_EXTENDED | REG_ICASE)) {
pre = "Create OK";
} else if(shellRegexMatch(command, "^\\s*drop\\s*.*", REG_EXTENDED | REG_ICASE)) {
pre = "Drop OK";
}
TAOS_FIELD *pFields = taos_fetch_fields(pSql); TAOS_FIELD *pFields = taos_fetch_fields(pSql);
if (pFields != NULL) { // select and show kinds of commands if (pFields != NULL) { // select and show kinds of commands
int32_t error_no = 0; int32_t error_no = 0;
@ -229,10 +241,10 @@ void shellRunSingleCommandImp(char *command) {
} }
taos_free_result(pSql); taos_free_result(pSql);
} else { } else {
int32_t num_rows_affacted = taos_affected_rows(pSql); int64_t num_rows_affacted = taos_affected_rows64(pSql);
taos_free_result(pSql); taos_free_result(pSql);
et = taosGetTimestampUs(); et = taosGetTimestampUs();
printf("Query OK, %d row(s) affected (%.6fs)\r\n", num_rows_affacted, (et - st) / 1E6); printf("%s, %" PRId64 " row(s) affected (%.6fs)\r\n", pre, num_rows_affacted, (et - st) / 1E6);
// call auto tab // call auto tab
callbackAutoTab(command, NULL, false); callbackAutoTab(command, NULL, false);