Merge branch '3.0' of github.com:taosdata/TDengine into test/chr/TD-14699

This commit is contained in:
tomchon 2022-05-21 23:15:24 +08:00
commit dce1931365
64 changed files with 417 additions and 398 deletions

View File

@ -171,6 +171,7 @@ tmq_t* build_consumer() {
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq); assert(tmq);
tmq_conf_destroy(conf);
return tmq; return tmq;
} }

View File

@ -128,6 +128,7 @@ extern bool tsStartUdfd;
// schemaless // schemaless
extern char tsSmlChildTableName[]; extern char tsSmlChildTableName[];
extern char tsSmlTagName[];
extern bool tsSmlDataFormat; extern bool tsSmlDataFormat;
// internal // internal

View File

@ -1987,19 +1987,16 @@ static FORCE_INLINE void tFreeClientHbReq(void* pReq) {
if (req->info) { if (req->info) {
tFreeReqKvHash(req->info); tFreeReqKvHash(req->info);
taosHashCleanup(req->info); taosHashCleanup(req->info);
req->info = NULL;
} }
} }
int32_t tSerializeSClientHbBatchReq(void* buf, int32_t bufLen, const SClientHbBatchReq* pReq); int32_t tSerializeSClientHbBatchReq(void* buf, int32_t bufLen, const SClientHbBatchReq* pReq);
int32_t tDeserializeSClientHbBatchReq(void* buf, int32_t bufLen, SClientHbBatchReq* pReq); int32_t tDeserializeSClientHbBatchReq(void* buf, int32_t bufLen, SClientHbBatchReq* pReq);
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
SClientHbBatchReq* req = (SClientHbBatchReq*)pReq; SClientHbBatchReq* req = (SClientHbBatchReq*)pReq;
if (deep) { taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
} else {
taosArrayDestroy(req->reqs);
}
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
@ -2023,6 +2020,7 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp); int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp); int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
void tFreeSClientHbBatchRsp(SClientHbBatchRsp *pBatchRsp);
static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) { static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1; if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
@ -2523,11 +2521,9 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->blockNum); buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) { if (pRsp->blockNum != 0) {
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag); buf = taosDecodeFixedI8(buf, &pRsp->withTag);
@ -2540,14 +2536,20 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
taosArrayPush(pRsp->blockDataLen, &bLen); taosArrayPush(pRsp->blockDataLen, &bLen);
taosArrayPush(pRsp->blockData, &data); taosArrayPush(pRsp->blockData, &data);
if (pRsp->withSchema) { if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
buf = taosDecodeSSchemaWrapper(buf, pSW); buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
} else {
pRsp->blockSchema = NULL;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
char* name = NULL; char* name = NULL;
buf = taosDecodeString(buf, &name); buf = taosDecodeString(buf, &name);
taosArrayPush(pRsp->blockTbName, &name); taosArrayPush(pRsp->blockTbName, &name);
} else {
pRsp->blockTbName = NULL;
} }
} }
} }

View File

@ -72,7 +72,6 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad);
* @param pMsg The request message * @param pMsg The request message
*/ */
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg); int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg);
int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -198,6 +198,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_HANDLE_ERROR(_code) \ #define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB \
|| (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \ #define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)

View File

@ -129,9 +129,8 @@ int32_t* taosGetErrno();
// mnode-common // mnode-common
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300) #define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301) #define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302) #define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303) #define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0304)
// mnode-show // mnode-show
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310) #define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310)

View File

@ -60,7 +60,7 @@ static void registerRequest(SRequestObj *pRequest) {
static void deregisterRequest(SRequestObj *pRequest) { static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
STscObj * pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary; SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
@ -313,7 +313,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return 0; return 0;
} }
SConfig * pCfg = taosGetCfg(); SConfig *pCfg = taosGetCfg();
SConfigItem *pItem = NULL; SConfigItem *pItem = NULL;
switch (option) { switch (option) {

View File

@ -394,6 +394,10 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen); tscDebug("hb got %d expired users, valueLen:%d", userNum, kv.valueLen);
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
}
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -429,6 +433,10 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen); tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen);
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
}
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -463,6 +471,10 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen); tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
}
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -511,16 +523,6 @@ static FORCE_INLINE void hbMgrInitHandle() {
hbMgrInitMqHbHandle(); hbMgrInitMqHbHandle();
} }
void hbFreeReq(void *req) {
SClientHbReq *pReq = (SClientHbReq *)req;
tFreeReqKvHash(pReq->info);
}
void hbClearClientHbReq(SClientHbReq *pReq) {
pReq->query = NULL;
pReq->info = NULL;
}
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq)); SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) { if (pBatchReq == NULL) {
@ -535,6 +537,8 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
while (pIter != NULL) { while (pIter != NULL) {
SClientHbReq *pOneReq = pIter; SClientHbReq *pOneReq = pIter;
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
if (info) { if (info) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq); code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq);
@ -544,7 +548,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
} }
} }
taosArrayPush(pBatchReq->reqs, pOneReq);
//hbClearClientHbReq(pOneReq); //hbClearClientHbReq(pOneReq);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
@ -601,8 +604,8 @@ static void *hbThreadFunc(void *param) {
void *buf = taosMemoryMalloc(tlen); void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false); tFreeClientHbBatchReq(pReq);
hbClearReqInfo(pAppHbMgr); //hbClearReqInfo(pAppHbMgr);
break; break;
} }
@ -611,8 +614,8 @@ static void *hbThreadFunc(void *param) {
if (pInfo == NULL) { if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false); tFreeClientHbBatchReq(pReq);
hbClearReqInfo(pAppHbMgr); //hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf); taosMemoryFree(buf);
break; break;
} }
@ -628,8 +631,8 @@ static void *hbThreadFunc(void *param) {
int64_t transporterId = 0; int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq, false); tFreeClientHbBatchReq(pReq);
hbClearReqInfo(pAppHbMgr); //hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
} }
@ -721,8 +724,7 @@ void appHbMgrCleanup(void) {
void *pIter = taosHashIterate(pTarget->activeInfo, NULL); void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SClientHbReq *pOneReq = pIter; SClientHbReq *pOneReq = pIter;
hbFreeReq(pOneReq); tFreeClientHbReq(pOneReq);
taosHashCleanup(pOneReq->info);
pIter = taosHashIterate(pTarget->activeInfo, pIter); pIter = taosHashIterate(pTarget->activeInfo, pIter);
} }
taosHashCleanup(pTarget->activeInfo); taosHashCleanup(pTarget->activeInfo);
@ -782,7 +784,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
} }
SClientHbReq hbReq = {0}; SClientHbReq hbReq = {0};
hbReq.connKey = connKey; hbReq.connKey = connKey;
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); //hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
@ -823,8 +825,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (pReq) { if (pReq) {
hbFreeReq(pReq); tFreeClientHbReq(pReq);
taosHashCleanup(pReq->info);
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
} }

View File

@ -345,6 +345,10 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
for (int32_t i = 0; i < pRsp->nBlocks; ++i) { for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
SSubmitBlkRsp* blk = pRsp->pBlocks + i; SSubmitBlkRsp* blk = pRsp->pBlocks + i;
if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
continue;
}
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver}; STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
taosArrayPush(pArray, &tbSver); taosArrayPush(pArray, &tbSver);
} }
@ -383,14 +387,14 @@ _return:
} }
void freeRequestRes(SRequestObj* pRequest, void* res) { void freeRequestRes(SRequestObj* pRequest, void* res) {
if (NULL == res) { if (NULL == pRequest || NULL == res) {
return; return;
} }
if (TDMT_VND_SUBMIT == pRequest->type) { if (TDMT_VND_SUBMIT == pRequest->type) {
tFreeSSubmitRsp((SSubmitRsp*)res); tFreeSSubmitRsp((SSubmitRsp*)res);
} else if (TDMT_VND_QUERY == pRequest->type) { } else if (TDMT_VND_QUERY == pRequest->type) {
taosArrayDestroy((SArray *)res); taosArrayDestroy((SArray*)res);
} }
} }
@ -431,12 +435,13 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno; pRequest->code = terrno;
freeRequestRes(pRequest, pRes);
pRes = NULL;
} }
if (res) { if (res) {
*res = pRes; *res = pRes;
} else {
freeRequestRes(pRequest, pRes);
pRes = NULL;
} }
return pRequest; return pRequest;
@ -499,6 +504,23 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
return code; return code;
} }
int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
SCatalog* pCatalog = NULL;
int32_t tbNum = taosArrayGetSize(tbList);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < tbNum; ++i) {
SName* pTbName = taosArrayGet(tbList, i);
catalogRemoveTableMeta(pCatalog, pTbName);
}
return TSDB_CODE_SUCCESS;
}
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
int32_t retryNum = 0; int32_t retryNum = 0;
@ -518,6 +540,10 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
} }
} while (retryNum++ < REQUEST_MAX_TRY_TIMES); } while (retryNum++ < REQUEST_MAX_TRY_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList);
}
return pRequest; return pRequest;
} }

View File

@ -146,10 +146,10 @@ void taos_free_result(TAOS_RES *res) {
SMqRspObj *pRsp = (SMqRspObj *)res; SMqRspObj *pRsp = (SMqRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
if (pRsp->rsp.blockSchema) taosArrayDestroy(pRsp->rsp.blockSchema);
if (pRsp->rsp.blockTbName) taosArrayDestroy(pRsp->rsp.blockTbName);
if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags); if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags);
if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema); if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema);
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL; pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo); doFreeReqResultInfo(&pRsp->resInfo);
} }

View File

@ -63,10 +63,6 @@ for (int i = 1; i < keyLen; ++i) { \
#define TS "_ts" #define TS "_ts"
#define TS_LEN 3 #define TS_LEN 3
#define TAG "_tag"
#define TAG_LEN 4
#define TAG_VALUE "NULL"
#define TAG_VALUE_LEN 4
#define VALUE "value" #define VALUE "value"
#define VALUE_LEN 5 #define VALUE_LEN 5
@ -263,7 +259,7 @@ static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSi
memcpy(tname, field->key, field->keyLen); memcpy(tname, field->key, field->keyLen);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
int32_t bytes = field->length > CHAR_SAVE_LENGTH ? (2*field->length) : CHAR_SAVE_LENGTH; int32_t bytes = field->length > CHAR_SAVE_LENGTH ? (2*field->length) : CHAR_SAVE_LENGTH;
int out = snprintf(buf, bufSize,"`%s` %s(%d)", int out = snprintf(buf, bufSize, "`%s` %s(%d)",
tname, tDataTypes[field->type].name, bytes); tname, tDataTypes[field->type].name, bytes);
*outBytes = out; *outBytes = out;
} else { } else {
@ -400,6 +396,12 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
pos += outBytes; freeBytes -= outBytes; pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes; *pos = ','; ++pos; --freeBytes;
} }
if(taosArrayGetSize(cols) == 0){
outBytes = snprintf(pos, freeBytes,"`%s` %s(%d)",
tsSmlTagName, tDataTypes[TSDB_DATA_TYPE_NCHAR].name, CHAR_SAVE_LENGTH);
pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes;
}
pos--; ++freeBytes; pos--; ++freeBytes;
outBytes = snprintf(pos, freeBytes, ")"); outBytes = snprintf(pos, freeBytes, ")");
TAOS_RES* res = taos_query(info->taos, result); TAOS_RES* res = taos_query(info->taos, result);
@ -724,9 +726,6 @@ static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
if(value + len != endPtr){ if(value + len != endPtr){
return -1; return -1;
} }
if(tsInt64 == 0){
return taosGetTimestampNs();
}
double ts = tsInt64; double ts = tsInt64;
switch (type) { switch (type) {
case TSDB_TIME_PRECISION_HOURS: case TSDB_TIME_PRECISION_HOURS:
@ -792,8 +791,8 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) {
} }
static int64_t smlParseInfluxTime(SSmlHandle* info, const char* data, int32_t len){ static int64_t smlParseInfluxTime(SSmlHandle* info, const char* data, int32_t len){
if(len == 0){ if(len == 0 || (len == 1 && data[0] == '0')){
return taosGetTimestamp(TSDB_TIME_PRECISION_NANO); return taosGetTimestampNs();
} }
int8_t tsType = smlGetTsTypeByPrecision(info->precision); int8_t tsType = smlGetTsTypeByPrecision(info->precision);
@ -815,6 +814,9 @@ static int64_t smlParseOpenTsdbTime(SSmlHandle* info, const char* data, int32_t
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
return -1; return -1;
} }
if(len == 1 && data[0] == '0'){
return taosGetTimestampNs();
}
int8_t tsType = smlGetTsTypeByLen(len); int8_t tsType = smlGetTsTypeByLen(len);
if (tsType == -1) { if (tsType == -1) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data); smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
@ -1112,14 +1114,6 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *childTableName, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *childTableName, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
if(isTag && len == 0){ if(isTag && len == 0){
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
kv->key = TAG;
kv->keyLen = TAG_LEN;
kv->value = TAG_VALUE;
kv->length = TAG_VALUE_LEN;
kv->type = TSDB_DATA_TYPE_NCHAR;
if(cols) taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -47,8 +47,14 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
} }
break; break;
case STMT_EXECUTE: case STMT_EXECUTE:
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
code = TSDB_CODE_TSC_STMT_API_ERROR; if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
} else {
if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
} }
break; break;
default: default:
@ -794,6 +800,7 @@ int stmtExec(TAOS_STMT* stmt) {
if (code) { if (code) {
pStmt->exec.pRequest->code = code; pStmt->exec.pRequest->code = code;
} else { } else {
tFreeSSubmitRsp(pRsp);
STMT_ERR_RET(stmtResetStmt(pStmt)); STMT_ERR_RET(stmtResetStmt(pStmt));
STMT_ERR_RET(TSDB_CODE_NEED_RETRY); STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
} }
@ -811,11 +818,13 @@ _return:
if (TSDB_CODE_SUCCESS == code && autoCreateTbl) { if (TSDB_CODE_SUCCESS == code && autoCreateTbl) {
if (NULL == pRsp) { if (NULL == pRsp) {
tscError("no submit resp got for auto create table"); tscError("no submit resp got for auto create table");
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); code = TSDB_CODE_TSC_APP_ERROR;
} else {
code = stmtUpdateTableUid(pStmt, pRsp);
} }
STMT_ERR_RET(stmtUpdateTableUid(pStmt, pRsp));
} }
tFreeSSubmitRsp(pRsp);
++pStmt->sql.runTimes; ++pStmt->sql.runTimes;

View File

@ -202,7 +202,12 @@ tmq_conf_t* tmq_conf_new() {
} }
void tmq_conf_destroy(tmq_conf_t* conf) { void tmq_conf_destroy(tmq_conf_t* conf) {
if (conf) taosMemoryFree(conf); if (conf) {
if (conf->ip) taosMemoryFree(conf->ip);
if (conf->user) taosMemoryFree(conf->user);
if (conf->pass) taosMemoryFree(conf->pass);
taosMemoryFree(conf);
}
} }
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
@ -497,6 +502,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
} else { } else {
ASSERT(0); ASSERT(0);
} }
taosFreeQitem(pTaskType);
} }
taosFreeQall(qall); taosFreeQall(qall);
return 0; return 0;
@ -954,8 +960,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqClientVg* pVg = pParam->pVg; SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic; SMqClientTopic* pTopic = pParam->pTopic;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
int32_t vgId = pParam->vgId;
int32_t epoch = pParam->epoch;
taosMemoryFree(pParam);
if (code != 0) { if (code != 0) {
tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code); tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
if (pMsg->pData) taosMemoryFree(pMsg->pData);
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
} }
@ -963,19 +973,21 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqEpoch = atomic_load_32(&tmq->epoch); int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) { if (msgEpoch < tmqEpoch) {
// do not write into queue since updating epoch reset // do not write into queue since updating epoch reset
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
tmqEpoch); tmqEpoch);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
taosMemoryFree(pMsg->pData);
return 0; return 0;
} }
if (msgEpoch != tmqEpoch) { if (msgEpoch != tmqEpoch) {
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
} }
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); taosMemoryFree(pMsg->pData);
tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
} }
@ -986,6 +998,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg); tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
taosMemoryFree(pMsg->pData);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId, tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset); pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
@ -995,7 +1008,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
CREATE_MSG_FAIL: CREATE_MSG_FAIL:
if (pParam->epoch == tmq->epoch) { if (epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
} }
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
@ -1088,6 +1101,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
int8_t async = pParam->async;
pParam->code = code; pParam->code = code;
if (code != 0) { if (code != 0) {
tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async); tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
@ -1104,7 +1118,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto END; goto END;
} }
if (!pParam->async) { if (!async) {
SMqAskEpRsp rsp; SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
@ -1125,13 +1139,14 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
taosWriteQitem(tmq->mqueue, pWrapper); taosWriteQitem(tmq->mqueue, pWrapper);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
taosMemoryFree(pParam);
} }
END: END:
/*atomic_store_8(&tmq->epStatus, 0);*/ /*atomic_store_8(&tmq->epStatus, 0);*/
if (!pParam->async) { if (!async) {
tsem_post(&pParam->rspSem); tsem_post(&pParam->rspSem);
} else {
taosMemoryFree(pParam);
} }
return code; return code;
} }
@ -1279,7 +1294,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
} }
taosFreeQitem(pWrapper);
return pRspObj; return pRspObj;
} }
@ -1401,6 +1415,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
} }
// build rsp // build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
/*printf("epoch mismatch\n");*/ /*printf("epoch mismatch\n");*/

View File

@ -269,16 +269,7 @@ TEST(testCase, smlParseCols_tag_Test) {
ret = smlParseCols(data, len, cols, NULL, true, dumplicateKey, &msgBuf); ret = smlParseCols(data, len, cols, NULL, true, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS); ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
size = taosArrayGetSize(cols); size = taosArrayGetSize(cols);
ASSERT_EQ(size, 1); ASSERT_EQ(size, 0);
// nchar
kv = (SSmlKv *)taosArrayGetP(cols, 0);
ASSERT_EQ(strncasecmp(kv->key, TAG, TAG_LEN), 0);
ASSERT_EQ(kv->keyLen, TAG_LEN);
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
ASSERT_EQ(kv->length, TAG_LEN);
ASSERT_EQ(strncasecmp(kv->value, TAG_VALUE, TAG_VALUE_LEN), 0);
taosMemoryFree(kv);
taosArrayDestroy(cols); taosArrayDestroy(cols);
taosHashCleanup(dumplicateKey); taosHashCleanup(dumplicateKey);
@ -1207,7 +1198,8 @@ TEST(testCase, sml_TD15662_Test) {
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql[] = { const char *sql[] = {
"hetrey,id=sub_table_0123456,t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64", "hetrey c0=f,c1=127i8 1626006833639",
"hetrey,t1=r c0=f,c1=127i8 1626006833640",
}; };
int ret = smlProcess(info, (char **)sql, sizeof(sql) / sizeof(sql[0])); int ret = smlProcess(info, (char **)sql, sizeof(sql) / sizeof(sql[0]));
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);

View File

@ -1077,7 +1077,7 @@ void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
if (tlen == 0) return NULL; // if (tlen == 0) return NULL; // nCols == 0 means no tags
tlen += TD_KV_ROW_HEAD_SIZE; tlen += TD_KV_ROW_HEAD_SIZE;
@ -1087,8 +1087,10 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
kvRowSetNCols(row, pBuilder->nCols); kvRowSetNCols(row, pBuilder->nCols);
kvRowSetLen(row, tlen); kvRowSetLen(row, tlen);
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); if(pBuilder->nCols > 0){
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
}
return row; return row;
} }

View File

@ -78,6 +78,7 @@ char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80; uint16_t tsTelemPort = 80;
// schemaless // schemaless
char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value. char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value.
//If set to empty system will generate table name using MD5 hash. //If set to empty system will generate table name using MD5 hash.
bool tsSmlDataFormat = true; // true means that the name and order of cols in each line are the same(only for influx protocol) bool tsSmlDataFormat = true; // true means that the name and order of cols in each line are the same(only for influx protocol)
@ -326,6 +327,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "smlTagNullName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 4; tsNumOfTaskQueueThreads = tsNumOfCores / 4;
@ -522,6 +524,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
} }
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagNullName")->str, TSDB_COL_NAME_LEN);
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;

View File

@ -308,13 +308,10 @@ static int compareKv(const void* p1, const void* p2) {
* use stable name and tags to grearate child table name * use stable name and tags to grearate child table name
*/ */
void buildChildTableName(RandTableName* rName) { void buildChildTableName(RandTableName* rName) {
int32_t size = taosArrayGetSize(rName->tags);
ASSERT(size > 0);
taosArraySort(rName->tags, compareKv);
SStringBuilder sb = {0}; SStringBuilder sb = {0};
taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen); taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen);
for (int j = 0; j < size; ++j) { taosArraySort(rName->tags, compareKv);
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
SSmlKv* tagKv = taosArrayGetP(rName->tags, j); SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen); taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
if(IS_VAR_DATA_TYPE(tagKv->type)){ if(IS_VAR_DATA_TYPE(tagKv->type)){

View File

@ -47,10 +47,8 @@ static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) {
static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SBnodeMgmt *pMgmt = pInfo->ahandle; SBnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
dTrace("msg:%p, get from bnode-monitor queue", pMsg); dTrace("msg:%p, get from bnode-monitor queue", pMsg);
SRpcMsg *pRpc = pMsg;
int32_t code = -1;
if (pMsg->msgType == TDMT_MON_BM_INFO) { if (pMsg->msgType == TDMT_MON_BM_INFO) {
code = bmProcessGetMonBmInfoReq(pMgmt, pMsg); code = bmProcessGetMonBmInfoReq(pMgmt, pMsg);
@ -58,13 +56,13 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
if (pRpc->msgType & 1U) { if (IsReq(pMsg)) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
bmSendRsp(pMsg, code); bmSendRsp(pMsg, code);
} }
dTrace("msg:%p, is freed, code:0x%x", pMsg, code); dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }

View File

@ -19,7 +19,6 @@
static void *dmStatusThreadFp(void *param) { static void *dmStatusThreadFp(void *param) {
SDnodeMgmt *pMgmt = param; SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs(); int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-status"); setThreadName("dnode-status");
while (1) { while (1) {
@ -40,7 +39,6 @@ static void *dmStatusThreadFp(void *param) {
static void *dmMonitorThreadFp(void *param) { static void *dmMonitorThreadFp(void *param) {
SDnodeMgmt *pMgmt = param; SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs(); int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-monitor"); setThreadName("dnode-monitor");
while (1) { while (1) {
@ -103,11 +101,9 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SDnodeMgmt *pMgmt = pInfo->ahandle; SDnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1; int32_t code = -1;
tmsg_t msgType = pMsg->msgType; dTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
bool isRequest = msgType & 1u;
dTrace("msg:%p, will be processed in dnode-mgmt queue, type:%s", pMsg, TMSG_INFO(msgType));
switch (msgType) { switch (pMsg->msgType) {
case TDMT_DND_CONFIG_DNODE: case TDMT_DND_CONFIG_DNODE:
code = dmProcessConfigReq(pMgmt, pMsg); code = dmProcessConfigReq(pMgmt, pMsg);
break; break;
@ -149,7 +145,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
break; break;
} }
if (isRequest) { if (IsReq(pMsg)) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = { SRpcMsg rsp = {
.code = code, .code = code,

View File

@ -46,7 +46,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
code = mndProcessMsg(pMsg); code = mndProcessMsg(pMsg);
} }
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code); mmSendRsp(pMsg, code);
} }
@ -56,28 +56,6 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->msgType;
bool isRequest = msgType & 1U;
dTrace("msg:%p, get from mnode-query queue", pMsg);
pMsg->info.node = pMgmt->pMnode;
code = mndProcessMsg(pMsg);
if (isRequest) {
if (pMsg->info.handle != NULL && code != 0) {
if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code);
}
}
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
@ -135,7 +113,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = tsNumOfMnodeQueryThreads, .min = tsNumOfMnodeQueryThreads,
.max = tsNumOfMnodeQueryThreads, .max = tsNumOfMnodeQueryThreads,
.name = "mnode-query", .name = "mnode-query",
.fp = (FItem)mmProcessQueryQueue, .fp = (FItem)mmProcessQueue,
.param = pMgmt, .param = pMgmt,
}; };
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {

View File

@ -19,70 +19,39 @@
static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = { SRpcMsg rsp = {
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info,
}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
static void qmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle; SQnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
dTrace("msg:%p, get from qnode queue", pMsg);
dTrace("msg:%p, get from qnode-monitor queue", pMsg); switch (pMsg->msgType) {
SRpcMsg *pRpc = pMsg; case TDMT_MON_QM_INFO:
int32_t code = -1; code = qmProcessGetMonitorInfoReq(pMgmt, pMsg);
break;
if (pMsg->msgType == TDMT_MON_QM_INFO) { default:
code = qmProcessGetMonitorInfoReq(pMgmt, pMsg); code = qndProcessQueryMsg(pMgmt->pQnode, pMsg);
} else { break;
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
if (pRpc->msgType & 1U) { if (IsReq(pMsg) && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
qmSendRsp(pMsg, code); qmSendRsp(pMsg, code);
} }
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
static void qmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from qnode-query queue", pMsg);
SRpcMsg *pRpc = pMsg;
int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pRpc);
if (pRpc->msgType & 1U && code != 0) {
qmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void qmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from qnode-fetch queue", pMsg);
SRpcMsg *pRpc = pMsg;
int32_t code = qndProcessFetchMsg(pMgmt->pQnode, pRpc);
if (pRpc->msgType & 1U && code != 0) {
qmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, code:0x%x", pMsg, code); dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
return 0; return 0;
} }
@ -101,9 +70,7 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) { if (pMsg == NULL) return -1;
return -1;
}
dTrace("msg:%p, create and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, create and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
@ -141,7 +108,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
.min = tsNumOfVnodeQueryThreads, .min = tsNumOfVnodeQueryThreads,
.max = tsNumOfVnodeQueryThreads, .max = tsNumOfVnodeQueryThreads,
.name = "qnode-query", .name = "qnode-query",
.fp = (FItem)qmProcessQueryQueue, .fp = (FItem)qmProcessQueue,
.param = pMgmt, .param = pMgmt,
}; };
@ -154,7 +121,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
.min = tsNumOfQnodeFetchThreads, .min = tsNumOfQnodeFetchThreads,
.max = tsNumOfQnodeFetchThreads, .max = tsNumOfQnodeFetchThreads,
.name = "qnode-fetch", .name = "qnode-fetch",
.fp = (FItem)qmProcessFetchQueue, .fp = (FItem)qmProcessQueue,
.param = pMgmt, .param = pMgmt,
}; };
@ -167,7 +134,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
.min = 1, .min = 1,
.max = 1, .max = 1,
.name = "qnode-monitor", .name = "qnode-monitor",
.fp = (FItem)qmProcessMonitorQueue, .fp = (FItem)qmProcessQueue,
.param = pMgmt, .param = pMgmt,
}; };
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {

View File

@ -28,10 +28,8 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle; SSnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
dTrace("msg:%p, get from snode-monitor queue", pMsg); dTrace("msg:%p, get from snode-monitor queue", pMsg);
SRpcMsg *pRpc = pMsg;
int32_t code = -1;
if (pMsg->msgType == TDMT_MON_SM_INFO) { if (pMsg->msgType == TDMT_MON_SM_INFO) {
code = smProcessGetMonitorInfoReq(pMgmt, pMsg); code = smProcessGetMonitorInfoReq(pMgmt, pMsg);
@ -39,13 +37,13 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
if (pRpc->msgType & 1U) { if (IsReq(pMsg)) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
smSendRsp(pMsg, code); smSendRsp(pMsg, code);
} }
dTrace("msg:%p, is freed, code:0x%x", pMsg, code); dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }

View File

@ -29,7 +29,7 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void vmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeMgmt *pMgmt = pInfo->ahandle; SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1; int32_t code = -1;
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
@ -92,7 +92,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SArray * pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
if (pArray == NULL) { if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
return; return;
@ -222,8 +222,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
} }
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
SRpcMsg * pRpc = pMsg; SMsgHead *pHead = pMsg->pCont;
SMsgHead *pHead = pRpc->pCont;
int32_t code = 0; int32_t code = 0;
pHead->contLen = ntohl(pHead->contLen); pHead->contLen = ntohl(pHead->contLen);
@ -237,23 +236,23 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType
switch (qtype) { switch (qtype) {
case QUERY_QUEUE: case QUERY_QUEUE:
dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pVnode->pQueryQ, pMsg); taosWriteQitem(pVnode->pQueryQ, pMsg);
break; break;
case FETCH_QUEUE: case FETCH_QUEUE:
dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pVnode->pFetchQ, pMsg); taosWriteQitem(pVnode->pFetchQ, pMsg);
break; break;
case WRITE_QUEUE: case WRITE_QUEUE:
dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pVnode->pWriteQ, pMsg); taosWriteQitem(pVnode->pWriteQ, pMsg);
break; break;
case SYNC_QUEUE: case SYNC_QUEUE:
dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pVnode->pSyncQ, pMsg); taosWriteQitem(pVnode->pSyncQ, pMsg);
break; break;
case MERGE_QUEUE: case MERGE_QUEUE:
dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pVnode->pMergeQ, pMsg); taosWriteQitem(pVnode->pMergeQ, pMsg);
break; break;
default: default:
@ -301,7 +300,7 @@ int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) { static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
SMsgHead * pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
@ -469,7 +468,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
.min = 1, .min = 1,
.max = 1, .max = 1,
.name = "vnode-mgmt", .name = "vnode-mgmt",
.fp = (FItem)vmProcessMgmtMonitorQueue, .fp = (FItem)vmProcessQueue,
.param = pMgmt, .param = pMgmt,
}; };
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
@ -481,7 +480,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
.min = 1, .min = 1,
.max = 1, .max = 1,
.name = "vnode-monitor", .name = "vnode-monitor",
.fp = (FItem)vmProcessMgmtMonitorQueue, .fp = (FItem)vmProcessQueue,
.param = pMgmt, .param = pMgmt,
}; };
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {

View File

@ -137,7 +137,6 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
void dmProcessFetchRsp(SRpcMsg *pMsg);
// dmNodes.c // dmNodes.c
int32_t dmOpenNode(SMgmtWrapper *pWrapper); int32_t dmOpenNode(SMgmtWrapper *pWrapper);

View File

@ -314,8 +314,3 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
void dmProcessFetchRsp(SRpcMsg *pMsg) {
qWorkerProcessFetchRsp(NULL, NULL, pMsg);
// rpcFreeCont(pMsg->pCont);
}

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
#include "qworker.h"
static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet); static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
static void dmSendRsp(SRpcMsg *pMsg); static void dmSendRsp(SRpcMsg *pMsg);
@ -61,7 +62,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmProcessNetTestReq(pDnode, pRpc); dmProcessNetTestReq(pDnode, pRpc);
return; return;
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
dmProcessFetchRsp(pRpc); qWorkerProcessFetchRsp(NULL, NULL, pRpc);
return; return;
} else { } else {
} }

View File

@ -304,10 +304,10 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
} }
code = mndCreateBnode(pMnode, pReq, pDnode, &createReq); code = mndCreateBnode(pMnode, pReq, pDnode, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("bnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); mError("bnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
} }
@ -414,10 +414,10 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
} }
code = mndDropBnode(pMnode, pReq, pObj); code = mndDropBnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("bnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); mError("bnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
} }

View File

@ -511,7 +511,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
} }
code = TSDB_CODE_MND_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
SUBSCRIBE_OVER: SUBSCRIBE_OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);

View File

@ -525,7 +525,6 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions; dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
dbObj.cfg.pRetensions = pCreate->pRetensions; dbObj.cfg.pRetensions = pCreate->pRetensions;
pCreate->pRetensions = NULL;
mndSetDefaultDbCfg(&dbObj.cfg); mndSetDefaultDbCfg(&dbObj.cfg);
@ -605,10 +604,10 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
} }
code = mndCreateDb(pMnode, pReq, &createReq, pUser); code = mndCreateDb(pMnode, pReq, &createReq, pUser);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("db:%s, failed to create since %s", createReq.db, terrstr()); mError("db:%s, failed to create since %s", createReq.db, terrstr());
} }
@ -839,10 +838,10 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
dbObj.cfgVersion++; dbObj.cfgVersion++;
dbObj.updateTime = taosGetTimestampMs(); dbObj.updateTime = taosGetTimestampMs();
code = mndAlterDb(pMnode, pReq, pDb, &dbObj); code = mndAlterDb(pMnode, pReq, pDb, &dbObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("db:%s, failed to alter since %s", alterReq.db, terrstr()); mError("db:%s, failed to alter since %s", alterReq.db, terrstr());
} }
@ -1110,10 +1109,10 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
} }
code = mndDropDb(pMnode, pReq, pDb); code = mndDropDb(pMnode, pReq, pDb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("db:%s, failed to drop since %s", dropReq.db, terrstr()); mError("db:%s, failed to drop since %s", dropReq.db, terrstr());
} }

View File

@ -504,10 +504,10 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
} }
code = mndCreateDnode(pMnode, pReq, &createReq); code = mndCreateDnode(pMnode, pReq, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
CREATE_DNODE_OVER: CREATE_DNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, terrstr()); mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, terrstr());
} }
@ -585,10 +585,10 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
} }
code = mndDropDnode(pMnode, pReq, pDnode); code = mndDropDnode(pMnode, pReq, pDnode);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
DROP_DNODE_OVER: DROP_DNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
} }

View File

@ -330,10 +330,10 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
} }
code = mndCreateFunc(pMnode, pReq, &createReq); code = mndCreateFunc(pMnode, pReq, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("func:%s, failed to create since %s", createReq.name, terrstr()); mError("func:%s, failed to create since %s", createReq.name, terrstr());
} }
@ -386,10 +386,10 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
} }
code = mndDropFunc(pMnode, pReq, pFunc); code = mndDropFunc(pMnode, pReq, pFunc);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("func:%s, failed to drop since %s", dropReq.name, terrstr()); mError("func:%s, failed to drop since %s", dropReq.name, terrstr());
} }

View File

@ -402,10 +402,10 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
} }
code = mndCreateMnode(pMnode, pReq, pDnode, &createReq); code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
} }
@ -574,10 +574,10 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
} }
code = mndDropMnode(pMnode, pReq, pObj); code = mndDropMnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
} }

View File

@ -205,7 +205,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
} }
mndTransDrop(pTrans); mndTransDrop(pTrans);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset) { static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset) {

View File

@ -128,7 +128,8 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
} }
static void mndFreeConn(SConnObj *pConn) { static void mndFreeConn(SConnObj *pConn) {
taosMemoryFreeClear(pConn->pQueries); taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn); mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
} }
@ -396,6 +397,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
if (NULL == hbRsp.info) { if (NULL == hbRsp.info) {
mError("taosArrayInit %d rsp kv failed", kvNum); mError("taosArrayInit %d rsp kv failed", kvNum);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeClientHbRsp(&hbRsp);
return -1; return -1;
} }
@ -453,6 +455,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
SClientHbBatchReq batchReq = {0}; SClientHbBatchReq batchReq = {0};
if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) { if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
@ -479,18 +482,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
void *buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp); tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
int32_t rspNum = (int32_t)taosArrayGetSize(batchRsp.rsps); tFreeClientHbBatchRsp(&batchRsp);
for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp *rsp = taosArrayGet(batchRsp.rsps, i);
int32_t kvNum = (rsp->info) ? taosArrayGetSize(rsp->info) : 0;
for (int32_t n = 0; n < kvNum; ++n) {
SKv *kv = taosArrayGet(rsp->info, n);
taosMemoryFreeClear(kv->value);
}
taosArrayDestroy(rsp->info);
}
taosArrayDestroy(batchRsp.rsps);
pReq->info.rspLen = tlen; pReq->info.rspLen = tlen;
pReq->info.rsp = buf; pReq->info.rsp = buf;

View File

@ -306,10 +306,10 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
} }
code = mndCreateQnode(pMnode, pReq, pDnode, &createReq); code = mndCreateQnode(pMnode, pReq, pDnode, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("qnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); mError("qnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
} }
@ -416,10 +416,10 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
} }
code = mndDropQnode(pMnode, pReq, pObj); code = mndDropQnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("qnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); mError("qnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
} }

View File

@ -18,37 +18,35 @@
#include "mndMnode.h" #include "mndMnode.h"
#include "qworker.h" #include "qworker.h"
int32_t mndProcessQueryMsg(SRpcMsg *pReq) { int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pReq->info.node; int32_t code = -1;
SMnode *pMnode = pMsg->info.node;
SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb};
mTrace("msg:%p, in query queue is processing", pReq); mTrace("msg:%p, in query queue is processing", pMsg);
switch (pReq->msgType) {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pReq);
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pReq);
default:
mError("unknown msg type:%d in query queue", pReq->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
int32_t mndProcessFetchMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
mTrace("msg:%p, in fetch queue is processing", pMsg);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY:
code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg);
break;
case TDMT_VND_QUERY_CONTINUE:
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg);
break;
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg); code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg);
break;
case TDMT_VND_DROP_TASK: case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg); code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg);
break;
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg); code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg);
break;
default: default:
mError("unknown msg type:%d in fetch queue", pMsg->msgType); terrno = TSDB_CODE_VND_APP_ERROR;
return TSDB_CODE_VND_APP_ERROR; mError("unknown msg type:%d in query queue", pMsg->msgType);
} }
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
return code;
} }
int32_t mndInitQuery(SMnode *pMnode) { int32_t mndInitQuery(SMnode *pMnode) {
@ -59,9 +57,9 @@ int32_t mndInitQuery(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_CONTINUE, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_VND_QUERY_CONTINUE, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessFetchMsg); mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessFetchMsg); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessFetchMsg); mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessQueryMsg);
return 0; return 0;
} }

View File

@ -562,10 +562,10 @@ static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq) {
} }
code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb); code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("sma:%s, failed to create since %s", createReq.name, terrstr(terrno)); mError("sma:%s, failed to create since %s", createReq.name, terrstr(terrno));
} }
@ -706,10 +706,10 @@ static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq) {
} }
code = mndDropSma(pMnode, pReq, pDb, pSma); code = mndDropSma(pMnode, pReq, pDb, pSma);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("sma:%s, failed to drop since %s", dropReq.name, terrstr()); mError("sma:%s, failed to drop since %s", dropReq.name, terrstr());
} }

View File

@ -312,10 +312,10 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
} }
code = mndCreateSnode(pMnode, pReq, pDnode, &createReq); code = mndCreateSnode(pMnode, pReq, pDnode, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("snode:%d, failed to create since %s", createReq.dnodeId, terrstr()); mError("snode:%d, failed to create since %s", createReq.dnodeId, terrstr());
return -1; return -1;
} }
@ -424,10 +424,10 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
} }
code = mndDropSnode(pMnode, pReq, pObj); code = mndDropSnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("snode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); mError("snode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
} }

View File

@ -827,10 +827,10 @@ static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq) {
} }
code = mndCreateStb(pMnode, pReq, &createReq, pDb); code = mndCreateStb(pMnode, pReq, &createReq, pDb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stb:%s, failed to create since %s", createReq.name, terrstr()); mError("stb:%s, failed to create since %s", createReq.name, terrstr());
} }
@ -1334,10 +1334,10 @@ static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) {
} }
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb); code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stb:%s, failed to alter since %s", alterReq.name, terrstr()); mError("stb:%s, failed to alter since %s", alterReq.name, terrstr());
} }
@ -1475,10 +1475,10 @@ static int32_t mndProcessMDropStbReq(SRpcMsg *pReq) {
} }
code = mndDropStb(pMnode, pReq, pDb, pStb); code = mndDropStb(pMnode, pReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); mError("stb:%s, failed to drop since %s", dropReq.name, terrstr());
} }
@ -1642,6 +1642,8 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int
if (pStbVersion->sversion != metaRsp.sversion) { if (pStbVersion->sversion != metaRsp.sversion) {
taosArrayPush(batchMetaRsp.pArray, &metaRsp); taosArrayPush(batchMetaRsp.pArray, &metaRsp);
} else {
tFreeSTableMetaRsp(&metaRsp);
} }
} }
@ -1660,6 +1662,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int
} }
tSerializeSTableMetaBatchRsp(pRsp, rspLen, &batchMetaRsp); tSerializeSTableMetaBatchRsp(pRsp, rspLen, &batchMetaRsp);
tFreeSTableMetaBatchRsp(&batchMetaRsp);
*ppRsp = pRsp; *ppRsp = pRsp;
*pRspLen = rspLen; *pRspLen = rspLen;
return 0; return 0;

View File

@ -472,10 +472,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
} }
code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb); code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
CREATE_STREAM_OVER: CREATE_STREAM_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
} }

View File

@ -457,10 +457,10 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
} }
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb); code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
CREATE_TOPIC_OVER: CREATE_TOPIC_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
} }
@ -547,7 +547,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return -1; return -1;
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) { static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) {

View File

@ -1035,13 +1035,13 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
} }
} else { } else {
mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions); mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
} }
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno);
} }
return code; return code;
@ -1049,7 +1049,7 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute undoActions since %s", terrstr()); mError("failed to execute undoActions since %s", terrstr());
} }
return code; return code;
@ -1088,7 +1088,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
pTrans->stage = TRN_STAGE_COMMIT; pTrans->stage = TRN_STAGE_COMMIT;
mDebug("trans:%d, stage from redoAction to commit", pTrans->id); mDebug("trans:%d, stage from redoAction to commit", pTrans->id);
continueExec = true; continueExec = true;
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code)); mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
continueExec = false; continueExec = false;
} else { } else {
@ -1176,7 +1176,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
pTrans->stage = TRN_STAGE_UNDO_LOG; pTrans->stage = TRN_STAGE_UNDO_LOG;
mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id);
continueExec = true; continueExec = true;
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
continueExec = false; continueExec = false;
} else { } else {

View File

@ -331,10 +331,10 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
} }
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq); code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("user:%s, failed to create since %s", createReq.user, terrstr()); mError("user:%s, failed to create since %s", createReq.user, terrstr());
} }
@ -536,10 +536,10 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
} }
code = mndAlterUser(pMnode, pUser, &newUser, pReq); code = mndAlterUser(pMnode, pUser, &newUser, pReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("user:%s, failed to alter since %s", alterReq.user, terrstr()); mError("user:%s, failed to alter since %s", alterReq.user, terrstr());
} }
@ -613,10 +613,10 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
} }
code = mndDropUser(pMnode, pReq, pUser); code = mndDropUser(pMnode, pReq, pUser);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("user:%s, failed to drop since %s", dropReq.user, terrstr()); mError("user:%s, failed to drop since %s", dropReq.user, terrstr());
} }

View File

@ -367,7 +367,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
} }
int32_t code = (*fp)(pMsg); int32_t code = (*fp)(pMsg);
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
terrno = code; terrno = code;
mTrace("msg:%p, in progress, app:%p", pMsg, ahandle); mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
} else if (code != 0) { } else if (code != 0) {

View File

@ -43,44 +43,49 @@ void qndClose(SQnode *pQnode) {
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
qTrace("message in qnode query queue is processing"); int32_t code = -1;
SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; SReadHandle handle = {.pMsgCb = &pQnode->msgCb};
qTrace("message in qnode queue is processing");
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: { case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg); code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg);
} break;
case TDMT_VND_QUERY_CONTINUE: case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg); code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg);
default: break;
qError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
qTrace("message in fetch queue is processing");
switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg);
break;
case TDMT_VND_FETCH_RSP: case TDMT_VND_FETCH_RSP:
return qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg);
break;
case TDMT_VND_RES_READY: case TDMT_VND_RES_READY:
return qWorkerProcessReadyMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessReadyMsg(pQnode, pQnode->pQuery, pMsg);
break;
case TDMT_VND_TASKS_STATUS: case TDMT_VND_TASKS_STATUS:
return qWorkerProcessStatusMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessStatusMsg(pQnode, pQnode->pQuery, pMsg);
break;
case TDMT_VND_CANCEL_TASK: case TDMT_VND_CANCEL_TASK:
return qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg);
break;
case TDMT_VND_DROP_TASK: case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg);
break;
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
// return vnodeGetTableMeta(pQnode, pMsg); // code = vnodeGetTableMeta(pQnode, pMsg);
// break;
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
// return tqProcessConsumeReq(pQnode->pTq, pMsg); // code = tqProcessConsumeReq(pQnode->pTq, pMsg);
// break;
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg); code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg);
break;
default: default:
qError("unknown msg type:%d in fetch queue", pMsg->msgType); qError("unknown msg type:%d in qnode queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; terrno = TSDB_CODE_VND_APP_ERROR;
} }
if (code == 0) return TSDB_CODE_ACTION_IN_PROGRESS;
return code;
} }

View File

@ -56,8 +56,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1; if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) { if (pME->type == TSDB_SUPER_TABLE) {
if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
} else if (pME->type == TSDB_CHILD_TABLE) { } else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
@ -67,7 +67,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) { } else if (pME->type == TSDB_TSMA_TABLE) {
pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma)); pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
if (!pME->smaEntry.tsma) { if (!pME->smaEntry.tsma) {

View File

@ -562,7 +562,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
// search table.db // search table.db
TBC *pTbDbc = NULL; TBC *pTbDbc = NULL;
SDecoder dc = {0}; SDecoder dc1 = {0};
SDecoder dc2 = {0};
/* get ctbEntry */ /* get ctbEntry */
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
@ -572,18 +573,16 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
ctbEntry.pBuf = taosMemoryMalloc(nData); ctbEntry.pBuf = taosMemoryMalloc(nData);
memcpy(ctbEntry.pBuf, pData, nData); memcpy(ctbEntry.pBuf, pData, nData);
tDecoderInit(&dc, ctbEntry.pBuf, nData); tDecoderInit(&dc1, ctbEntry.pBuf, nData);
metaDecodeEntry(&dc, &ctbEntry); metaDecodeEntry(&dc1, &ctbEntry);
tDecoderClear(&dc);
/* get stbEntry*/ /* get stbEntry*/
tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal); tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey), tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
(void **)&stbEntry.pBuf, &nVal); (void **)&stbEntry.pBuf, &nVal);
tdbFree(pVal); tdbFree(pVal);
tDecoderInit(&dc, stbEntry.pBuf, nVal); tDecoderInit(&dc2, stbEntry.pBuf, nVal);
metaDecodeEntry(&dc, &stbEntry); metaDecodeEntry(&dc2, &stbEntry);
tDecoderClear(&dc);
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag; SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
SSchema *pColumn = NULL; SSchema *pColumn = NULL;
@ -638,6 +637,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
// save to uid.idx // save to uid.idx
tdbTbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn); tdbTbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
tDecoderClear(&dc1);
tDecoderClear(&dc2);
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf); if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf); if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
@ -645,6 +646,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
return 0; return 0;
_err: _err:
tDecoderClear(&dc1);
tDecoderClear(&dc2);
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf); if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf); if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);

View File

@ -132,6 +132,7 @@ int32_t smaClose(SSma *pSma) {
if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma)); if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma));
if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma)); if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma));
if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma)); if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
taosMemoryFree(pSma);
} }
return 0; return 0;
} }

View File

@ -57,6 +57,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
void tqClose(STQ* pTq) { void tqClose(STQ* pTq) {
if (pTq) { if (pTq) {
taosMemoryFreeClear(pTq->path); taosMemoryFreeClear(pTq->path);
taosHashCleanup(pTq->execs);
taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr);
taosMemoryFree(pTq); taosMemoryFree(pTq);
} }
// TODO // TODO
@ -409,9 +412,9 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
pTopic->buffer.output[j].status = 0; pTopic->buffer.output[j].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pReadHandle,
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb, .pMsgCb = &pTq->pVnode->msgCb,
}; };
pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].pReadHandle = pReadHandle;
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
@ -1000,10 +1003,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
for (int32_t i = 0; i < parallel; i++) { for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pStreamReader, .reader = pStreamReader,
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb, .pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
}; };
pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);

View File

@ -477,6 +477,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
pCommitIter->pTable->pSchema = pTSchema; // metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0); pCommitIter->pTable->pSchema = pTSchema; // metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0);
} }
} }
tSkipListDestroyIter(pSlIter);
return 0; return 0;
} }

View File

@ -147,9 +147,6 @@ _err:
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
#if 0
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
#endif
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: case TDMT_VND_QUERY:

View File

@ -2784,6 +2784,8 @@ int32_t catalogRemoveTableMeta(SCatalog *pCtg, const SName *pTableName) {
CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true)); CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true));
} }
ctgDebug("table meta %s.%s removed", dbFName, pTableName->tname);
_return: _return:
taosMemoryFreeClear(tblMeta); taosMemoryFreeClear(tblMeta);
@ -2958,7 +2960,11 @@ int32_t catalogChkTbMetaVersion(SCatalog *pCtg, void *pTrans, const SEpSet *pMgm
int32_t sver = 0; int32_t sver = 0;
int32_t tbNum = taosArrayGetSize(pTables); int32_t tbNum = taosArrayGetSize(pTables);
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
STbSVersion *pTb = (STbSVersion *)taosArrayGet(pTables, i); STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
if (NULL == pTb->tbFName || 0 == pTb->tbFName[0]) {
continue;
}
tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (CTG_IS_SYS_DBNAME(name.dbname)) { if (CTG_IS_SYS_DBNAME(name.dbname)) {

View File

@ -827,7 +827,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder); SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
if (NULL == row) { if (NULL == row) {
return buildInvalidOperationMsg(&pCxt->msg, "tag value expected"); return buildInvalidOperationMsg(&pCxt->msg, "out of memory");
} }
tdSortKVRowByColIdx(row); tdSortKVRowByColIdx(row);
@ -1085,6 +1085,10 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// no data in the sql string anymore. // no data in the sql string anymore.
if (sToken.n == 0) { if (sToken.n == 0) {
if (sToken.type && pCxt->pSql[0]) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z);
}
if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) { if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
return buildInvalidOperationMsg(&pCxt->msg, "no data in sql"); return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
} }
@ -1347,7 +1351,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
SKVRow row = tdGetKVRowFromBuilder(&tagBuilder); SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
if (NULL == row) { if (NULL == row) {
tdDestroyKVRowBuilder(&tagBuilder); tdDestroyKVRowBuilder(&tagBuilder);
return buildInvalidOperationMsg(&pBuf, "tag value expected"); return buildInvalidOperationMsg(&pBuf, "out of memory");
} }
tdSortKVRowByColIdx(row); tdSortKVRowByColIdx(row);
@ -1696,7 +1700,7 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
*row = tdGetKVRowFromBuilder(tagsBuilder); *row = tdGetKVRowFromBuilder(tagsBuilder);
if (*row == NULL) { if (*row == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_OUT_OF_MEMORY;
} }
tdSortKVRowByColIdx(*row); tdSortKVRowByColIdx(*row);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -704,6 +704,7 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) {
if (t0.type == TK_NK_SEMI) { if (t0.type == TK_NK_SEMI) {
t0.n = 0; t0.n = 0;
t0.type = 0;
return t0; return t0;
} }

View File

@ -725,7 +725,11 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion); qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName); if (dbFName[0] && tbName[0]) {
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
} else {
ctx->tbInfo.tbFName[0] = 0;
}
} }
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {

View File

@ -17,21 +17,20 @@
#include "ttime.h" #include "ttime.h"
#define DEFAULT_FALSE_POSITIVE 0.01 #define DEFAULT_FALSE_POSITIVE 0.01
#define DEFAULT_BUCKET_SIZE 1024 #define DEFAULT_BUCKET_SIZE 1024
#define ROWS_PER_MILLISECOND 1 #define ROWS_PER_MILLISECOND 1
#define MAX_NUM_SCALABLE_BF 120 #define MAX_NUM_SCALABLE_BF 120
#define MIN_NUM_SCALABLE_BF 10 #define MIN_NUM_SCALABLE_BF 10
#define DEFAULT_PREADD_BUCKET 1 #define DEFAULT_PREADD_BUCKET 1
#define MAX_INTERVAL MILLISECOND_PER_MINUTE #define MAX_INTERVAL MILLISECOND_PER_MINUTE
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) #define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
if (pInfo->numSBFs < count ) { if (pInfo->numSBFs < count) {
count = pInfo->numSBFs; count = pInfo->numSBFs;
} }
for (uint64_t i = 0; i < count; ++i) { for (uint64_t i = 0; i < count; ++i) {
SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE);
DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &tsSBF); taosArrayPush(pInfo->pTsSBFs, &tsSBF);
} }
} }
@ -76,7 +75,7 @@ static int64_t adjustWatermark(int64_t interval, int32_t watermark) {
return watermark; return watermark;
} }
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark) { SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark) {
return updateInfoInit(pInterval->interval, pInterval->precision, watermark); return updateInfoInit(pInterval->interval, pInterval->precision, watermark);
} }
@ -93,7 +92,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval); uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(SScalableBf)); pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *));
if (pInfo->pTsSBFs == NULL) { if (pInfo->pTsSBFs == NULL) {
updateInfoDestroy(pInfo); updateInfoDestroy(pInfo);
return NULL; return NULL;
@ -108,14 +107,14 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
} }
TSKEY dumy = 0; TSKEY dumy = 0;
for(uint64_t i=0; i < DEFAULT_BUCKET_SIZE; ++i) { for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
taosArrayPush(pInfo->pTsBuckets, &dumy); taosArrayPush(pInfo->pTsBuckets, &dumy);
} }
pInfo->numBuckets = DEFAULT_BUCKET_SIZE; pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
return pInfo; return pInfo;
} }
static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) { static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
if (ts <= 0) { if (ts <= 0) {
return NULL; return NULL;
} }
@ -131,24 +130,23 @@ static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) {
} }
SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index); SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
if (res == NULL) { if (res == NULL) {
res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE);
DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &res); taosArrayPush(pInfo->pTsSBFs, &res);
} }
return res; return res;
} }
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
int32_t res = TSDB_CODE_FAILED; int32_t res = TSDB_CODE_FAILED;
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
SScalableBf* pSBf = getSBf(pInfo, ts); SScalableBf *pSBf = getSBf(pInfo, ts);
// pSBf may be a null pointer // pSBf may be a null pointer
if (pSBf) { if (pSBf) {
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
} }
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (maxTs < ts ) { if (maxTs < ts) {
taosArraySet(pInfo->pTsBuckets, index, &ts); taosArraySet(pInfo->pTsBuckets, index, &ts);
return false; return false;
} }
@ -159,7 +157,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
return false; return false;
} }
//check from tsdb api // check from tsdb api
return true; return true;
} }
@ -174,7 +172,7 @@ void updateInfoDestroy(SUpdateInfo *pInfo) {
SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i); SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i);
tScalableBfDestroy(pSBF); tScalableBfDestroy(pSBF);
} }
taosArrayDestroy(pInfo->pTsSBFs); taosArrayDestroy(pInfo->pTsSBFs);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }

View File

@ -134,7 +134,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt cl
// mnode-common // mnode-common
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error") TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Mnode not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Mnode not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection")

View File

@ -1825,10 +1825,12 @@ int queryColumnTest(TAOS_STMT *stmt, TAOS *taos) {
if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) { if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) {
exit(1); exit(1);
} }
if (taos_stmt_add_batch(stmt)) { if (rand() % 2 == 0) {
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); if (taos_stmt_add_batch(stmt)) {
exit(1); printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
exit(1);
}
} }
if (taos_stmt_execute(stmt) != 0) { if (taos_stmt_execute(stmt) != 0) {
@ -1871,10 +1873,12 @@ int queryMiscTest(TAOS_STMT *stmt, TAOS *taos) {
if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) { if (bpBindParam(stmt, data.pBind + n * gCurCase->bindColNum)) {
exit(1); exit(1);
} }
if (taos_stmt_add_batch(stmt)) { if (rand() % 2 == 0) {
printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); if (taos_stmt_add_batch(stmt)) {
exit(1); printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt));
exit(1);
}
} }
if (taos_stmt_execute(stmt) != 0) { if (taos_stmt_execute(stmt) != 0) {

View File

@ -28,6 +28,7 @@
./test.sh -f tsim/insert/basic1.sim ./test.sh -f tsim/insert/basic1.sim
./test.sh -f tsim/insert/backquote.sim ./test.sh -f tsim/insert/backquote.sim
./test.sh -f tsim/insert/null.sim ./test.sh -f tsim/insert/null.sim
./test.sh -f tsim/insert/update0.sim
# ---- parser # ---- parser
./test.sh -f tsim/parser/groupby-basic.sim ./test.sh -f tsim/parser/groupby-basic.sim

View File

@ -28,15 +28,17 @@ if $rows != 1 then
return -1 return -1
endi endi
print =============== step2 #TODO OPEN THIS WHEN STABLE DELETE WORKS
sql drop table $mt #print =============== step2
sql show stables #sql drop table $mt
if $rows != 0 then #sql show stables
return -1 #if $rows != 0 then
endi # return -1
#endi
print =============== step3 #print =============== step3
sql create table $mt (ts timestamp, speed int) TAGS(sp int) #sql create table $mt (ts timestamp, speed int) TAGS(sp int)
#TODO OPEN THIS WHEN STABLE DELETE WORKS
sql show stables sql show stables
if $rows != 1 then if $rows != 1 then
@ -134,4 +136,4 @@ if $rows != 2 then
return -1 return -1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,8 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database d0 vgroups 1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -14,7 +14,7 @@ class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor(), logSql)
def getBuildPath(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))

View File

@ -98,6 +98,8 @@ SScript *simProcessCallOver(SScript *script) {
return NULL; return NULL;
} }
if (simScriptPos == -1) return NULL;
return simScriptList[simScriptPos]; return simScriptList[simScriptPos];
} else { } else {
simDebug("script:%s, is stopped", script->fileName); simDebug("script:%s, is stopped", script->fileName);

View File

@ -742,6 +742,7 @@ void shellReadHistory() {
int32_t read_size = 0; int32_t read_size = 0;
while ((read_size = taosGetLineFile(pFile, &line)) != -1) { while ((read_size = taosGetLineFile(pFile, &line)) != -1) {
line[read_size - 1] = '\0'; line[read_size - 1] = '\0';
taosMemoryFree(pHistory->hist[pHistory->hend]);
pHistory->hist[pHistory->hend] = strdup(line); pHistory->hist[pHistory->hend] = strdup(line);
pHistory->hend = (pHistory->hend + 1) % SHELL_MAX_HISTORY_SIZE; pHistory->hend = (pHistory->hend + 1) % SHELL_MAX_HISTORY_SIZE;
@ -763,7 +764,8 @@ void shellWriteHistory() {
for (int32_t i = pHistory->hstart; i != pHistory->hend;) { for (int32_t i = pHistory->hstart; i != pHistory->hend;) {
if (pHistory->hist[i] != NULL) { if (pHistory->hist[i] != NULL) {
taosFprintfFile(pFile, "%s\n", pHistory->hist[i]); taosFprintfFile(pFile, "%s\n", pHistory->hist[i]);
taosMemoryFreeClear(pHistory->hist[i]); taosMemoryFree(pHistory->hist[i]);
pHistory->hist[i] = NULL;
} }
i = (i + 1) % SHELL_MAX_HISTORY_SIZE; i = (i + 1) % SHELL_MAX_HISTORY_SIZE;
} }
@ -771,6 +773,16 @@ void shellWriteHistory() {
taosCloseFile(&pFile); taosCloseFile(&pFile);
} }
void shellCleanupHistory() {
SShellHistory *pHistory = &shell.history;
for (int32_t i = 0; i < SHELL_MAX_HISTORY_SIZE; ++i) {
if (pHistory->hist[i] != NULL) {
taosMemoryFree(pHistory->hist[i]);
pHistory->hist[i] = NULL;
}
}
}
void shellPrintError(TAOS_RES *tres, int64_t st) { void shellPrintError(TAOS_RES *tres, int64_t st) {
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
fprintf(stderr, "\nDB error: %s (%.6fs)\n", taos_errstr(tres), (et - st) / 1E6); fprintf(stderr, "\nDB error: %s (%.6fs)\n", taos_errstr(tres), (et - st) / 1E6);
@ -971,6 +983,7 @@ int32_t shellExecute() {
taos_close(shell.conn); taos_close(shell.conn);
shellWriteHistory(); shellWriteHistory();
shellCleanupHistory();
return 0; return 0;
} }
@ -996,5 +1009,6 @@ int32_t shellExecute() {
taosThreadClear(&shell.pid); taosThreadClear(&shell.pid);
} }
shellCleanupHistory();
return 0; return 0;
} }