Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj
This commit is contained in:
commit
2288d35194
|
@ -1432,7 +1432,6 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct SVCreateTbReq {
|
typedef struct SVCreateTbReq {
|
||||||
int64_t ver; // use a general definition
|
int64_t ver; // use a general definition
|
||||||
char* dbFName;
|
|
||||||
char* name;
|
char* name;
|
||||||
uint32_t ttl;
|
uint32_t ttl;
|
||||||
uint32_t keep;
|
uint32_t keep;
|
||||||
|
|
|
@ -199,7 +199,7 @@ typedef enum EOperatorType {
|
||||||
} EOperatorType;
|
} EOperatorType;
|
||||||
|
|
||||||
typedef enum ELogicConditionType {
|
typedef enum ELogicConditionType {
|
||||||
LOGIC_COND_TYPE_AND,
|
LOGIC_COND_TYPE_AND = 1,
|
||||||
LOGIC_COND_TYPE_OR,
|
LOGIC_COND_TYPE_OR,
|
||||||
LOGIC_COND_TYPE_NOT,
|
LOGIC_COND_TYPE_NOT,
|
||||||
} ELogicConditionType;
|
} ELogicConditionType;
|
||||||
|
|
|
@ -398,7 +398,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->ver);
|
tlen += taosEncodeFixedI64(buf, pReq->ver);
|
||||||
tlen += taosEncodeString(buf, pReq->dbFName);
|
|
||||||
tlen += taosEncodeString(buf, pReq->name);
|
tlen += taosEncodeString(buf, pReq->name);
|
||||||
tlen += taosEncodeFixedU32(buf, pReq->ttl);
|
tlen += taosEncodeFixedU32(buf, pReq->ttl);
|
||||||
tlen += taosEncodeFixedU32(buf, pReq->keep);
|
tlen += taosEncodeFixedU32(buf, pReq->keep);
|
||||||
|
@ -467,7 +466,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
|
|
||||||
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
buf = taosDecodeFixedI64(buf, &(pReq->ver));
|
buf = taosDecodeFixedI64(buf, &(pReq->ver));
|
||||||
buf = taosDecodeString(buf, &(pReq->dbFName));
|
|
||||||
buf = taosDecodeString(buf, &(pReq->name));
|
buf = taosDecodeString(buf, &(pReq->name));
|
||||||
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
|
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
|
||||||
buf = taosDecodeFixedU32(buf, &(pReq->keep));
|
buf = taosDecodeFixedU32(buf, &(pReq->keep));
|
||||||
|
|
|
@ -36,12 +36,12 @@ static int32_t dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SStatusRsp statusRsp = {0};
|
SStatusRsp statusRsp = {0};
|
||||||
if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
|
if (pRsp->pCont != NULL && pRsp->contLen > 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
|
||||||
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
|
|
||||||
pDnode->data.dnodeVer = statusRsp.dnodeVer;
|
pDnode->data.dnodeVer = statusRsp.dnodeVer;
|
||||||
dmUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
|
dmUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
|
||||||
dmUpdateEps(pDnode, statusRsp.pDnodeEps);
|
dmUpdateEps(pDnode, statusRsp.pDnodeEps);
|
||||||
}
|
}
|
||||||
|
rpcFreeCont(pRsp->pCont);
|
||||||
tFreeSStatusRsp(&statusRsp);
|
tFreeSStatusRsp(&statusRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -82,13 +82,13 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
|
memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));
|
||||||
|
|
||||||
pCfg->vgId = pCreate->vgId;
|
pCfg->vgId = pCreate->vgId;
|
||||||
|
strcpy(pCfg->dbname, pCreate->db);
|
||||||
pCfg->wsize = pCreate->cacheBlockSize * 1024 * 1024;
|
pCfg->wsize = pCreate->cacheBlockSize * 1024 * 1024;
|
||||||
pCfg->ssize = 1024;
|
pCfg->ssize = 1024;
|
||||||
pCfg->lsize = 1024 * 1024;
|
pCfg->lsize = 1024 * 1024;
|
||||||
pCfg->isHeapAllocator = true;
|
|
||||||
pCfg->ttl = 4;
|
|
||||||
pCfg->keep = pCreate->daysToKeep0;
|
|
||||||
pCfg->streamMode = pCreate->streamMode;
|
pCfg->streamMode = pCreate->streamMode;
|
||||||
pCfg->isWeak = true;
|
pCfg->isWeak = true;
|
||||||
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
|
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
|
||||||
|
@ -96,12 +96,6 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0;
|
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0;
|
||||||
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
|
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
|
||||||
pCfg->tsdbCfg.retentions = pCreate->pRetensions;
|
pCfg->tsdbCfg.retentions = pCreate->pRetensions;
|
||||||
pCfg->walCfg.level = TAOS_WAL_WRITE;
|
|
||||||
pCfg->walCfg.fsyncPeriod = 0;
|
|
||||||
pCfg->walCfg.retentionPeriod = 0;
|
|
||||||
pCfg->walCfg.retentionSize = 0;
|
|
||||||
pCfg->walCfg.rollPeriod = 0;
|
|
||||||
pCfg->walCfg.segSize = 0;
|
|
||||||
pCfg->walCfg.vgId = pCreate->vgId;
|
pCfg->walCfg.vgId = pCreate->vgId;
|
||||||
pCfg->hashBegin = pCreate->hashBegin;
|
pCfg->hashBegin = pCreate->hashBegin;
|
||||||
pCfg->hashEnd = pCreate->hashEnd;
|
pCfg->hashEnd = pCreate->hashEnd;
|
||||||
|
|
|
@ -97,6 +97,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||||
|
|
||||||
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnodeObj *pVnode = pInfo->ahandle;
|
SVnodeObj *pVnode = pInfo->ahandle;
|
||||||
|
int64_t version;
|
||||||
|
|
||||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
||||||
if (pArray == NULL) {
|
if (pArray == NULL) {
|
||||||
|
@ -115,23 +116,32 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodePreprocessWriteReqs(pVnode->pImpl, pArray);
|
vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version);
|
||||||
|
|
||||||
numOfMsgs = taosArrayGetSize(pArray);
|
numOfMsgs = taosArrayGetSize(pArray);
|
||||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg rsp;
|
||||||
|
|
||||||
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, &pRsp);
|
rsp.pCont = NULL;
|
||||||
|
rsp.contLen = 0;
|
||||||
|
rsp.code = 0;
|
||||||
|
rsp.handle = pRpc->handle;
|
||||||
|
rsp.ahandle = pRpc->ahandle;
|
||||||
|
|
||||||
|
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
pRsp->ahandle = pRpc->ahandle;
|
pRsp->ahandle = pRpc->ahandle;
|
||||||
tmsgSendRsp(pRsp);
|
|
||||||
taosMemoryFree(pRsp);
|
taosMemoryFree(pRsp);
|
||||||
} else {
|
} else {
|
||||||
if (code != 0 && terrno != 0) code = terrno;
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
|
@ -153,7 +163,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
|
|
||||||
// todo
|
// todo
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
(void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
// (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,8 +41,8 @@ static const SInfosTableSchema mnodesSchema[] = {
|
||||||
|
|
||||||
static const SInfosTableSchema modulesSchema[] = {
|
static const SInfosTableSchema modulesSchema[] = {
|
||||||
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "endpoint", .bytes = 134 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "module", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "module", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema qnodesSchema[] = {
|
static const SInfosTableSchema qnodesSchema[] = {
|
||||||
|
@ -145,9 +145,9 @@ static const SInfosTableSchema userTblsSchema[] = {
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema userTblDistSchema[] = {
|
static const SInfosTableSchema userTblDistSchema[] = {
|
||||||
{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "distributed_histogram", .bytes = 500, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "distributed_histogram", .bytes = 500 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "avg_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "avg_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
@ -168,35 +168,33 @@ static const SInfosTableSchema userUsersSchema[] = {
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema grantsSchema[] = {
|
static const SInfosTableSchema grantsSchema[] = {
|
||||||
{.name = "version", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "version", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema vgroupsSchema[] = {
|
static const SInfosTableSchema vgroupsSchema[] = {
|
||||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
|
||||||
{.name = "v1_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "v1_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "v1_status", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "v1_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "v2_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "v2_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "v2_status", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "v2_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "v3_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "v3_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "v3_status", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "v3_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "compacting", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "nfiles", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "nfiles", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
};
|
};
|
||||||
|
@ -206,7 +204,7 @@ static const SInfosTableSchema topicSchema[] = {
|
||||||
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "sql", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -349,7 +349,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
|
|
||||||
SVCreateTbReq req = {0};
|
SVCreateTbReq req = {0};
|
||||||
req.ver = 0;
|
req.ver = 0;
|
||||||
req.dbFName = dbFName;
|
|
||||||
req.name = (char *)tNameGetTableName(&name);
|
req.name = (char *)tNameGetTableName(&name);
|
||||||
req.ttl = 0;
|
req.ttl = 0;
|
||||||
req.keep = 0;
|
req.keep = 0;
|
||||||
|
|
|
@ -537,17 +537,6 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock*
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
|
||||||
|
|
||||||
// status
|
|
||||||
char buf[10] = {0};
|
|
||||||
STR_TO_VARSTR(buf, "ready"); // TODO
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataAppend(pColInfo, numOfRows, buf, false);
|
|
||||||
|
|
||||||
// onlines
|
|
||||||
int32_t onlines = pVgroup->replica;
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&onlines, false);
|
|
||||||
|
|
||||||
// default 3 replica
|
// default 3 replica
|
||||||
for (int32_t i = 0; i < 3; ++i) {
|
for (int32_t i = 0; i < 3; ++i) {
|
||||||
|
|
||||||
|
|
|
@ -40,14 +40,16 @@ typedef struct SVnode SVnode;
|
||||||
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
||||||
typedef struct SVnodeCfg SVnodeCfg;
|
typedef struct SVnodeCfg SVnodeCfg;
|
||||||
|
|
||||||
|
extern const SVnodeCfg vnodeCfgDefault;
|
||||||
|
|
||||||
int vnodeInit(int nthreads);
|
int vnodeInit(int nthreads);
|
||||||
void vnodeCleanup();
|
void vnodeCleanup();
|
||||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs);
|
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||||
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
@ -133,6 +135,7 @@ struct STsdbCfg {
|
||||||
|
|
||||||
struct SVnodeCfg {
|
struct SVnodeCfg {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
char dbname[TSDB_DB_NAME_LEN];
|
||||||
uint64_t dbId;
|
uint64_t dbId;
|
||||||
uint64_t wsize;
|
uint64_t wsize;
|
||||||
uint64_t ssize;
|
uint64_t ssize;
|
||||||
|
|
|
@ -31,6 +31,8 @@ extern "C" {
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
// vnodeCfg ====================
|
// vnodeCfg ====================
|
||||||
|
int vnodeEncodeConfig(const void* pObj, SJson* pJson);
|
||||||
|
int vnodeDecodeConfig(const SJson* pJson, void* pObj);
|
||||||
|
|
||||||
// vnodeModule ====================
|
// vnodeModule ====================
|
||||||
int vnodeScheduleTask(int (*execute)(void*), void* arg);
|
int vnodeScheduleTask(int (*execute)(void*), void* arg);
|
||||||
|
|
|
@ -16,13 +16,113 @@
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
const SVnodeCfg vnodeCfgDefault = {
|
const SVnodeCfg vnodeCfgDefault = {
|
||||||
.wsize = 96 * 1024 * 1024, .ssize = 1 * 1024 * 1024, .lsize = 1024, .walCfg = {.level = TAOS_WAL_WRITE}};
|
.vgId = -1,
|
||||||
|
.dbname = "",
|
||||||
|
.dbId = 0,
|
||||||
|
.wsize = 96 * 1024 * 1024,
|
||||||
|
.ssize = 1 * 1024 * 1024,
|
||||||
|
.lsize = 1024,
|
||||||
|
.isHeapAllocator = false,
|
||||||
|
.ttl = 0,
|
||||||
|
.keep = 0,
|
||||||
|
.streamMode = 0,
|
||||||
|
.isWeak = 0,
|
||||||
|
.tsdbCfg = {.precision = TWO_STAGE_COMP,
|
||||||
|
.update = 0,
|
||||||
|
.compression = 2,
|
||||||
|
.days = 10,
|
||||||
|
.minRows = 100,
|
||||||
|
.maxRows = 4096,
|
||||||
|
.keep2 = 3650,
|
||||||
|
.keep0 = 3650,
|
||||||
|
.keep1 = 3650},
|
||||||
|
.walCfg =
|
||||||
|
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE},
|
||||||
|
.hashBegin = 0,
|
||||||
|
.hashEnd = 0,
|
||||||
|
.hashMethod = 0};
|
||||||
|
|
||||||
int vnodeCheckCfg(const SVnodeCfg *pCfg) {
|
int vnodeCheckCfg(const SVnodeCfg *pCfg) {
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||||
|
const SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
|
||||||
|
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId) < 0) return -1;
|
||||||
|
if (tjsonAddStringToObject(pJson, "dbname", pCfg->dbname) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "wsize", pCfg->wsize) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "ssize", pCfg->ssize) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "lsize", pCfg->lsize) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "ttl", pCfg->ttl) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "keep", pCfg->keep) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
|
SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
|
||||||
|
|
||||||
|
if (tjsonGetNumberValue(pJson, "vgId", pCfg->vgId) < 0) return -1;
|
||||||
|
if (tjsonGetStringValue(pJson, "dbname", pCfg->dbname) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "dbId", pCfg->dbId) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "wsize", pCfg->wsize) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "ssize", pCfg->ssize) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "lsize", pCfg->lsize) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "ttl", pCfg->ttl) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "keep", pCfg->keep) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) {
|
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) {
|
||||||
uint32_t hashValue = 0;
|
uint32_t hashValue = 0;
|
||||||
|
|
||||||
|
|
|
@ -181,80 +181,6 @@ static int vnodeEndCommit(SVnode *pVnode) {
|
||||||
|
|
||||||
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
|
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
|
||||||
|
|
||||||
static int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|
||||||
const SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
|
|
||||||
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "wsize", pCfg->wsize) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "ssize", pCfg->ssize) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "lsize", pCfg->lsize) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "ttl", pCfg->ttl) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "keep", pCfg->keep) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|
||||||
SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
|
|
||||||
|
|
||||||
if (tjsonGetNumberValue(pJson, "vgId", pCfg->vgId) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "dbId", pCfg->dbId) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "wsize", pCfg->wsize) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "ssize", pCfg->ssize) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "lsize", pCfg->lsize) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "ttl", pCfg->ttl) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "keep", pCfg->keep) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
|
||||||
if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
|
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
|
||||||
const SVState *pState = (SVState *)pObj;
|
const SVState *pState = (SVState *)pObj;
|
||||||
|
|
||||||
|
|
|
@ -16,34 +16,32 @@
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
|
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
|
||||||
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp);
|
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp);
|
||||||
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
|
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
|
||||||
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
|
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
|
||||||
|
|
||||||
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) {
|
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
|
||||||
SNodeMsg *pMsg;
|
SNodeMsg *pMsg;
|
||||||
SRpcMsg *pRpc;
|
SRpcMsg *pRpc;
|
||||||
|
|
||||||
|
*version = pVnode->state.processed;
|
||||||
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
||||||
pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
|
pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
|
||||||
pRpc = &pMsg->rpcMsg;
|
pRpc = &pMsg->rpcMsg;
|
||||||
|
|
||||||
// set request version
|
// set request version
|
||||||
void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead));
|
if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
|
||||||
int64_t ver = pVnode->state.processed++;
|
|
||||||
taosEncodeFixedI64(&pBuf, ver);
|
|
||||||
|
|
||||||
if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
/*ASSERT(false);*/
|
|
||||||
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
|
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
walFsync(pVnode->pWal, false);
|
walFsync(pVnode->pWal, false);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
|
||||||
void *ptr = NULL;
|
void *ptr = NULL;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
|
@ -58,9 +56,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: change the interface here
|
// todo: change the interface here
|
||||||
int64_t ver;
|
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||||
taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
|
|
||||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
|
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,6 +65,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
||||||
return 0;
|
return 0;
|
||||||
case TDMT_VND_CREATE_TABLE:
|
case TDMT_VND_CREATE_TABLE:
|
||||||
|
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
||||||
return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp);
|
return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp);
|
||||||
case TDMT_VND_ALTER_STB:
|
case TDMT_VND_ALTER_STB:
|
||||||
return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
||||||
|
@ -78,14 +75,8 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
case TDMT_VND_DROP_TABLE:
|
case TDMT_VND_DROP_TABLE:
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_SUBMIT:
|
case TDMT_VND_SUBMIT:
|
||||||
/*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/
|
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
|
||||||
if (pVnode->config.streamMode == 0) {
|
return vnodeProcessSubmitReq(pVnode, ptr, pRsp);
|
||||||
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
|
||||||
(*pRsp)->handle = pMsg->handle;
|
|
||||||
(*pRsp)->ahandle = pMsg->ahandle;
|
|
||||||
return vnodeProcessSubmitReq(pVnode, ptr, *pRsp);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TDMT_VND_MQ_SET_CONN: {
|
case TDMT_VND_MQ_SET_CONN: {
|
||||||
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
|
@ -128,7 +119,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->state.applied = ver;
|
pVnode->state.applied = version;
|
||||||
|
|
||||||
// Check if it needs to commit
|
// Check if it needs to commit
|
||||||
if (vnodeShouldCommit(pVnode)) {
|
if (vnodeShouldCommit(pVnode)) {
|
||||||
|
@ -217,13 +208,12 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
|
||||||
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
|
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
|
||||||
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
|
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
|
||||||
}
|
}
|
||||||
taosMemoryFree(vCreateTbReq.dbFName);
|
|
||||||
taosMemoryFree(vCreateTbReq.name);
|
taosMemoryFree(vCreateTbReq.name);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp) {
|
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) {
|
||||||
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
||||||
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
|
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
|
||||||
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
|
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
|
||||||
|
@ -233,7 +223,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
||||||
|
|
||||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||||
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
|
||||||
sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name);
|
sprintf(tableFName, "%s.%s", pVnode->config.dbname, pCreateTbReq->name);
|
||||||
|
|
||||||
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
|
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -249,7 +239,6 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
||||||
}
|
}
|
||||||
// TODO: to encapsule a free API
|
// TODO: to encapsule a free API
|
||||||
taosMemoryFree(pCreateTbReq->name);
|
taosMemoryFree(pCreateTbReq->name);
|
||||||
taosMemoryFree(pCreateTbReq->dbFName);
|
|
||||||
if (pCreateTbReq->type == TD_SUPER_TABLE) {
|
if (pCreateTbReq->type == TD_SUPER_TABLE) {
|
||||||
taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
|
taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
|
||||||
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
|
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
|
||||||
|
@ -276,12 +265,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
||||||
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
|
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
|
||||||
taosArrayDestroy(vCreateTbBatchRsp.rspList);
|
taosArrayDestroy(vCreateTbBatchRsp.rspList);
|
||||||
|
|
||||||
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
pRsp->pCont = msg;
|
||||||
(*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
pRsp->contLen = contLen;
|
||||||
(*pRsp)->pCont = msg;
|
|
||||||
(*pRsp)->contLen = contLen;
|
|
||||||
(*pRsp)->handle = pMsg->handle;
|
|
||||||
(*pRsp)->ahandle = pMsg->ahandle;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -298,7 +283,6 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
|
||||||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
|
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam->pFuncIds);
|
||||||
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
|
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
|
||||||
}
|
}
|
||||||
taosMemoryFree(vAlterTbReq.dbFName);
|
|
||||||
taosMemoryFree(vAlterTbReq.name);
|
taosMemoryFree(vAlterTbReq.name);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -315,7 +299,6 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
// encode the response (TODO)
|
// encode the response (TODO)
|
||||||
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
|
|
||||||
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
|
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
|
||||||
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
|
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
|
||||||
pRsp->contLen = sizeof(SSubmitRsp);
|
pRsp->contLen = sizeof(SSubmitRsp);
|
||||||
|
|
|
@ -646,7 +646,7 @@ predicate(A) ::= expression(B) BETWEEN expression(C) AND expression(D).
|
||||||
predicate(A) ::= expression(B) NOT BETWEEN expression(C) AND expression(D). {
|
predicate(A) ::= expression(B) NOT BETWEEN expression(C) AND expression(D). {
|
||||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||||
SToken e = getTokenFromRawExprNode(pCxt, D);
|
SToken e = getTokenFromRawExprNode(pCxt, D);
|
||||||
A = createRawExprNodeExt(pCxt, &s, &e, createNotBetweenAnd(pCxt, releaseRawExprNode(pCxt, C), releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, D)));
|
A = createRawExprNodeExt(pCxt, &s, &e, createNotBetweenAnd(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C), releaseRawExprNode(pCxt, D)));
|
||||||
}
|
}
|
||||||
predicate(A) ::= expression(B) IS NULL(C). {
|
predicate(A) ::= expression(B) IS NULL(C). {
|
||||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||||
|
|
|
@ -15,8 +15,8 @@
|
||||||
|
|
||||||
#include "parInsertData.h"
|
#include "parInsertData.h"
|
||||||
#include "parInt.h"
|
#include "parInt.h"
|
||||||
#include "parUtil.h"
|
|
||||||
#include "parToken.h"
|
#include "parToken.h"
|
||||||
|
#include "parUtil.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
|
@ -228,25 +228,23 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
createSName(&name, pTname, pBasicCtx, &pCxt->msg);
|
createSName(&name, pTname, pBasicCtx, &pCxt->msg);
|
||||||
if (isStb) {
|
if (isStb) {
|
||||||
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
|
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
|
||||||
|
&pCxt->pTableMeta));
|
||||||
} else {
|
} else {
|
||||||
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
|
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
|
||||||
|
&pCxt->pTableMeta));
|
||||||
}
|
}
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
CHECK_CODE(
|
||||||
|
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
||||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, false); }
|
||||||
return getTableMetaImpl(pCxt, pTname, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
|
||||||
return getTableMetaImpl(pCxt, pTname, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, true); }
|
||||||
|
|
||||||
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
|
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
|
||||||
while (start < end) {
|
while (start < end) {
|
||||||
|
@ -391,8 +389,10 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
|
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
|
||||||
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT &&
|
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
|
||||||
pToken->type != TK_NK_BOOL && pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN) ||
|
pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
|
||||||
|
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
|
||||||
|
pToken->type != TK_NK_BIN) ||
|
||||||
(pToken->n == 0) || (pToken->type == TK_NK_RP)) {
|
(pToken->n == 0) || (pToken->type == TK_NK_RP)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
|
||||||
}
|
}
|
||||||
|
@ -448,7 +448,8 @@ static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPt
|
||||||
return pToken->type;
|
return pToken->type;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
|
static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf,
|
||||||
|
_row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
|
||||||
int64_t iv;
|
int64_t iv;
|
||||||
char* endptr = NULL;
|
char* endptr = NULL;
|
||||||
bool isSigned = false;
|
bool isSigned = false;
|
||||||
|
@ -570,7 +571,8 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
|
||||||
if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
|
if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
|
||||||
}
|
}
|
||||||
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
|
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
|
||||||
|
isnan(dv)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
|
||||||
}
|
}
|
||||||
float tmpVal = (float)dv;
|
float tmpVal = (float)dv;
|
||||||
|
@ -748,7 +750,8 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, voi
|
||||||
if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
|
if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
|
||||||
char buf[512] = {0};
|
char buf[512] = {0};
|
||||||
snprintf(buf, tListLen(buf), "%s", strerror(errno));
|
snprintf(buf, tListLen(buf), "%s", strerror(errno));
|
||||||
return buildSyntaxErrMsg(pMsgBuf, buf, value);;
|
return buildSyntaxErrMsg(pMsgBuf, buf, value);
|
||||||
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
varDataSetLen(pa->buf, output);
|
varDataSetLen(pa->buf, output);
|
||||||
|
@ -764,7 +767,6 @@ static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, S
|
||||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||||
tNameGetFullDbName(pName, dbFName);
|
tNameGetFullDbName(pName, dbFName);
|
||||||
pCxt->createTblReq.type = TD_CHILD_TABLE;
|
pCxt->createTblReq.type = TD_CHILD_TABLE;
|
||||||
pCxt->createTblReq.dbFName = strdup(dbFName);
|
|
||||||
pCxt->createTblReq.name = strdup(pName->tname);
|
pCxt->createTblReq.name = strdup(pName->tname);
|
||||||
pCxt->createTblReq.ctbCfg.suid = pCxt->pTableMeta->suid;
|
pCxt->createTblReq.ctbCfg.suid = pCxt->pTableMeta->suid;
|
||||||
pCxt->createTblReq.ctbCfg.pTag = row;
|
pCxt->createTblReq.ctbCfg.pTag = row;
|
||||||
|
@ -785,7 +787,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
|
||||||
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
|
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
|
||||||
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1
|
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1
|
||||||
param.schema = pTagSchema;
|
param.schema = pTagSchema;
|
||||||
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg));
|
CHECK_CODE(
|
||||||
|
parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
||||||
|
@ -863,7 +866,8 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) {
|
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len,
|
||||||
|
char* tmpTokenBuf) {
|
||||||
SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
|
SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
|
||||||
SRowBuilder* pBuilder = &pDataBlocks->rowBuilder;
|
SRowBuilder* pBuilder = &pDataBlocks->rowBuilder;
|
||||||
STSRow* row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size); // skip the SSubmitBlk header
|
STSRow* row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size); // skip the SSubmitBlk header
|
||||||
|
@ -967,7 +971,6 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
|
static void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
|
||||||
taosMemoryFreeClear(pReq->dbFName);
|
|
||||||
taosMemoryFreeClear(pReq->name);
|
taosMemoryFreeClear(pReq->name);
|
||||||
taosMemoryFreeClear(pReq->ctbCfg.pTag);
|
taosMemoryFreeClear(pReq->ctbCfg.pTag);
|
||||||
}
|
}
|
||||||
|
@ -1022,7 +1025,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
// no data in the sql string anymore.
|
// no data in the sql string anymore.
|
||||||
if (sToken.n == 0) {
|
if (sToken.n == 0) {
|
||||||
if (0 == pCxt->totalNum) {
|
if (0 == pCxt->totalNum) {
|
||||||
return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");;
|
return buildInvalidOperationMsg(&pCxt->msg, "no data in sql");
|
||||||
|
;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1040,7 +1044,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
|
|
||||||
STableDataBlocks* dataBuf = NULL;
|
STableDataBlocks* dataBuf = NULL;
|
||||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq));
|
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
|
||||||
|
&dataBuf, NULL, &pCxt->createTblReq));
|
||||||
|
|
||||||
if (TK_NK_LP == sToken.type) {
|
if (TK_NK_LP == sToken.type) {
|
||||||
// pSql -> field1_name, ...)
|
// pSql -> field1_name, ...)
|
||||||
|
@ -1070,7 +1075,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
|
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z);
|
||||||
}
|
}
|
||||||
// merge according to vgId
|
// merge according to vgId
|
||||||
if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) &&
|
||||||
|
taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
||||||
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
||||||
}
|
}
|
||||||
return buildOutput(pCxt);
|
return buildOutput(pCxt);
|
||||||
|
@ -1092,11 +1098,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
|
||||||
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
||||||
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
|
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false),
|
||||||
.totalNum = 0,
|
.totalNum = 0,
|
||||||
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT)
|
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT)};
|
||||||
};
|
|
||||||
|
|
||||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj ||
|
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
|
||||||
NULL == context.pSubTableHashObj || NULL == context.pOutput) {
|
NULL == context.pOutput) {
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -251,6 +251,9 @@ static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SCol
|
||||||
pCol->colType = pProjCol->colType;
|
pCol->colType = pProjCol->colType;
|
||||||
}
|
}
|
||||||
strcpy(pCol->colName, pExpr->aliasName);
|
strcpy(pCol->colName, pExpr->aliasName);
|
||||||
|
if ('\0' == pCol->node.aliasName[0]) {
|
||||||
|
strcpy(pCol->node.aliasName, pCol->colName);
|
||||||
|
}
|
||||||
pCol->node.resType = pExpr->resType;
|
pCol->node.resType = pExpr->resType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,25 +384,9 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
|
||||||
}
|
}
|
||||||
res = (found ? DEAL_RES_CONTINUE : translateColumnWithoutPrefix(pCxt, pCol));
|
res = (found ? DEAL_RES_CONTINUE : translateColumnWithoutPrefix(pCxt, pCol));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DEAL_RES_ERROR == res) {
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SQL_CLAUSE_WINDOW == pCxt->currClause && QUERY_NODE_STATE_WINDOW == nodeType(pCxt->pCurrStmt->pWindow)) {
|
|
||||||
if (!IS_INTEGER_TYPE(pCol->node.resType.type)) {
|
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE);
|
|
||||||
}
|
|
||||||
if (COLUMN_TYPE_TAG == pCol->colType) {
|
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_COL);
|
|
||||||
}
|
|
||||||
if (TSDB_SUPER_TABLE == pCol->tableType) {
|
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return DEAL_RES_CONTINUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
||||||
uint8_t precision = (NULL != pCxt->pCurrStmt ? pCxt->pCurrStmt->precision : pVal->node.resType.precision);
|
uint8_t precision = (NULL != pCxt->pCurrStmt ? pCxt->pCurrStmt->precision : pVal->node.resType.precision);
|
||||||
pVal->node.resType.precision = precision;
|
pVal->node.resType.precision = precision;
|
||||||
|
@ -1200,9 +1187,27 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
STranslateContext* pCxt = pContext;
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
if (!IS_INTEGER_TYPE(pCol->node.resType.type)) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE);
|
||||||
|
}
|
||||||
|
if (COLUMN_TYPE_TAG == pCol->colType) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_COL);
|
||||||
|
}
|
||||||
|
if (TSDB_SUPER_TABLE == pCol->tableType) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t checkStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
|
static int32_t checkStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
|
||||||
|
nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt);
|
||||||
// todo check for "function not support for state_window"
|
// todo check for "function not support for state_window"
|
||||||
return TSDB_CODE_SUCCESS;
|
return pCxt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) {
|
static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) {
|
||||||
|
@ -2747,7 +2752,6 @@ static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSch
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyCreateTbReq(SVCreateTbReq* pReq) {
|
static void destroyCreateTbReq(SVCreateTbReq* pReq) {
|
||||||
taosMemoryFreeClear(pReq->dbFName);
|
|
||||||
taosMemoryFreeClear(pReq->name);
|
taosMemoryFreeClear(pReq->name);
|
||||||
taosMemoryFreeClear(pReq->ntbCfg.pSchema);
|
taosMemoryFreeClear(pReq->ntbCfg.pSchema);
|
||||||
}
|
}
|
||||||
|
@ -2784,7 +2788,6 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
||||||
|
|
||||||
SVCreateTbReq req = {0};
|
SVCreateTbReq req = {0};
|
||||||
req.type = TD_NORMAL_TABLE;
|
req.type = TD_NORMAL_TABLE;
|
||||||
req.dbFName = strdup(dbFName);
|
|
||||||
req.name = strdup(pStmt->tableName);
|
req.name = strdup(pStmt->tableName);
|
||||||
req.ntbCfg.nCols = LIST_LENGTH(pStmt->pCols);
|
req.ntbCfg.nCols = LIST_LENGTH(pStmt->pCols);
|
||||||
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchema));
|
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchema));
|
||||||
|
@ -2843,7 +2846,6 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
|
||||||
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
|
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
|
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
|
||||||
taosMemoryFreeClear(pTableReq->dbFName);
|
|
||||||
taosMemoryFreeClear(pTableReq->name);
|
taosMemoryFreeClear(pTableReq->name);
|
||||||
|
|
||||||
if (pTableReq->type == TSDB_NORMAL_TABLE) {
|
if (pTableReq->type == TSDB_NORMAL_TABLE) {
|
||||||
|
@ -2929,7 +2931,6 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
|
||||||
|
|
||||||
struct SVCreateTbReq req = {0};
|
struct SVCreateTbReq req = {0};
|
||||||
req.type = TD_CHILD_TABLE;
|
req.type = TD_CHILD_TABLE;
|
||||||
req.dbFName = strdup(dbFName);
|
|
||||||
req.name = strdup(pTableName);
|
req.name = strdup(pTableName);
|
||||||
req.ctbCfg.suid = suid;
|
req.ctbCfg.suid = suid;
|
||||||
req.ctbCfg.pTag = row;
|
req.ctbCfg.pTag = row;
|
||||||
|
|
|
@ -3730,7 +3730,7 @@ static YYACTIONTYPE yy_reduce(
|
||||||
{
|
{
|
||||||
SToken s = getTokenFromRawExprNode(pCxt, yymsp[-5].minor.yy456);
|
SToken s = getTokenFromRawExprNode(pCxt, yymsp[-5].minor.yy456);
|
||||||
SToken e = getTokenFromRawExprNode(pCxt, yymsp[0].minor.yy456);
|
SToken e = getTokenFromRawExprNode(pCxt, yymsp[0].minor.yy456);
|
||||||
yylhsminor.yy456 = createRawExprNodeExt(pCxt, &s, &e, createNotBetweenAnd(pCxt, releaseRawExprNode(pCxt, yymsp[-2].minor.yy456), releaseRawExprNode(pCxt, yymsp[-5].minor.yy456), releaseRawExprNode(pCxt, yymsp[0].minor.yy456)));
|
yylhsminor.yy456 = createRawExprNodeExt(pCxt, &s, &e, createNotBetweenAnd(pCxt, releaseRawExprNode(pCxt, yymsp[-5].minor.yy456), releaseRawExprNode(pCxt, yymsp[-2].minor.yy456), releaseRawExprNode(pCxt, yymsp[0].minor.yy456)));
|
||||||
}
|
}
|
||||||
yymsp[-5].minor.yy456 = yylhsminor.yy456;
|
yymsp[-5].minor.yy456 = yylhsminor.yy456;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -77,6 +77,10 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) {
|
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
// todo: release after function splitting
|
||||||
|
if (TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (NULL == pNode->pParent ||
|
if (NULL == pNode->pParent ||
|
||||||
(QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent))) {
|
(QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent))) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -32,6 +32,8 @@ using namespace testing;
|
||||||
} \
|
} \
|
||||||
} while(0);
|
} while(0);
|
||||||
|
|
||||||
|
bool g_isDump = false;
|
||||||
|
|
||||||
class PlannerTestBaseImpl {
|
class PlannerTestBaseImpl {
|
||||||
public:
|
public:
|
||||||
void useDb(const string& acctId, const string& db) {
|
void useDb(const string& acctId, const string& db) {
|
||||||
|
|
|
@ -32,4 +32,6 @@ private:
|
||||||
std::unique_ptr<PlannerTestBaseImpl> impl_;
|
std::unique_ptr<PlannerTestBaseImpl> impl_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
extern bool g_isDump;
|
||||||
|
|
||||||
#endif // PLAN_TEST_UTIL_H
|
#endif // PLAN_TEST_UTIL_H
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
#include "mockCatalog.h"
|
#include "mockCatalog.h"
|
||||||
|
#include "planTestUtil.h"
|
||||||
|
|
||||||
class PlannerEnv : public testing::Environment {
|
class PlannerEnv : public testing::Environment {
|
||||||
public:
|
public:
|
||||||
|
@ -34,8 +35,27 @@ public:
|
||||||
virtual ~PlannerEnv() {}
|
virtual ~PlannerEnv() {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static void parseArg(int argc, char* argv[]) {
|
||||||
|
int opt = 0;
|
||||||
|
const char *optstring = "";
|
||||||
|
static struct option long_options[] = {
|
||||||
|
{"dump", no_argument, NULL, 'd'},
|
||||||
|
{0, 0, 0, 0}
|
||||||
|
};
|
||||||
|
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
|
||||||
|
switch (opt) {
|
||||||
|
case 'd':
|
||||||
|
g_isDump = true;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char* argv[]) {
|
int main(int argc, char* argv[]) {
|
||||||
testing::AddGlobalTestEnvironment(new PlannerEnv());
|
testing::AddGlobalTestEnvironment(new PlannerEnv());
|
||||||
testing::InitGoogleTest(&argc, argv);
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
parseArg(argc, argv);
|
||||||
return RUN_ALL_TESTS();
|
return RUN_ALL_TESTS();
|
||||||
}
|
}
|
||||||
|
|
|
@ -212,10 +212,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CONN_NO_PERSIST_BY_APP(conn) \
|
#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||||
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||||
#define CONN_RELEASE_BY_SERVER(conn) \
|
|
||||||
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
|
||||||
|
|
||||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||||
|
@ -290,9 +288,8 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType),
|
||||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||||
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
|
||||||
|
|
||||||
conn->secured = pHead->secured;
|
conn->secured = pHead->secured;
|
||||||
|
|
||||||
|
@ -358,12 +355,10 @@ void cliHandleExcept(SCliConn* pConn) {
|
||||||
|
|
||||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||||
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle,
|
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType));
|
||||||
TMSG_INFO(transMsg.msgType));
|
|
||||||
if (transMsg.ahandle == NULL) {
|
if (transMsg.ahandle == NULL) {
|
||||||
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
||||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
|
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle);
|
||||||
transMsg.ahandle);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||||
|
@ -546,6 +541,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
transCtxCleanup(&conn->ctx);
|
transCtxCleanup(&conn->ctx);
|
||||||
transQueueDestroy(&conn->cliMsgs);
|
transQueueDestroy(&conn->cliMsgs);
|
||||||
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
transDestroyBuffer(&conn->readBuf);
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
}
|
}
|
||||||
static bool cliHandleNoResp(SCliConn* conn) {
|
static bool cliHandleNoResp(SCliConn* conn) {
|
||||||
|
@ -635,9 +631,8 @@ void cliSend(SCliConn* pConn) {
|
||||||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType),
|
||||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
|
||||||
|
|
||||||
if (pHead->persist == 1) {
|
if (pHead->persist == 1) {
|
||||||
CONN_SET_PERSIST_BY_APP(pConn);
|
CONN_SET_PERSIST_BY_APP(pConn);
|
||||||
|
@ -675,10 +670,9 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
tDebug("cli work thread %p start to quit", pThrd);
|
tDebug("cli work thread %p start to quit", pThrd);
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
destroyConnPool(pThrd->pool);
|
destroyConnPool(pThrd->pool);
|
||||||
|
|
||||||
uv_timer_stop(&pThrd->timer);
|
uv_timer_stop(&pThrd->timer);
|
||||||
|
|
||||||
pThrd->quit = true;
|
pThrd->quit = true;
|
||||||
|
|
||||||
uv_stop(pThrd->loop);
|
uv_stop(pThrd->loop);
|
||||||
}
|
}
|
||||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
|
|
@ -195,7 +195,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
|
||||||
void transDestroyAsyncPool(SAsyncPool* pool) {
|
void transDestroyAsyncPool(SAsyncPool* pool) {
|
||||||
for (int i = 0; i < pool->nAsync; i++) {
|
for (int i = 0; i < pool->nAsync; i++) {
|
||||||
uv_async_t* async = &(pool->asyncs[i]);
|
uv_async_t* async = &(pool->asyncs[i]);
|
||||||
|
uv_close((uv_handle_t*)async, NULL);
|
||||||
SAsyncItem* item = async->data;
|
SAsyncItem* item = async->data;
|
||||||
taosThreadMutexDestroy(&item->mtx);
|
taosThreadMutexDestroy(&item->mtx);
|
||||||
taosMemoryFree(item);
|
taosMemoryFree(item);
|
||||||
|
|
|
@ -126,6 +126,11 @@ static void uvWorkerAsyncCb(uv_async_t* handle);
|
||||||
static void uvAcceptAsyncCb(uv_async_t* handle);
|
static void uvAcceptAsyncCb(uv_async_t* handle);
|
||||||
static void uvShutDownCb(uv_shutdown_t* req, int status);
|
static void uvShutDownCb(uv_shutdown_t* req, int status);
|
||||||
|
|
||||||
|
static void uvFreeCb(uv_handle_t* handle) {
|
||||||
|
//
|
||||||
|
taosMemoryFree(handle);
|
||||||
|
}
|
||||||
|
|
||||||
static void uvStartSendRespInternal(SSrvMsg* smsg);
|
static void uvStartSendRespInternal(SSrvMsg* smsg);
|
||||||
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
||||||
static void uvStartSendResp(SSrvMsg* msg);
|
static void uvStartSendResp(SSrvMsg* msg);
|
||||||
|
@ -141,8 +146,7 @@ static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
|
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister};
|
||||||
uvHandleRegister};
|
|
||||||
|
|
||||||
static void uvDestroyConn(uv_handle_t* handle);
|
static void uvDestroyConn(uv_handle_t* handle);
|
||||||
|
|
||||||
|
@ -205,13 +209,12 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
}
|
}
|
||||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr),
|
||||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||||
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
|
||||||
} else {
|
} else {
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType),
|
||||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port),
|
||||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
|
transMsg.contLen, pHead->noResp);
|
||||||
// no ref here
|
// no ref here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,6 +321,8 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
||||||
} else {
|
} else {
|
||||||
tError("fail to dispatch conn to work thread");
|
tError("fail to dispatch conn to work thread");
|
||||||
}
|
}
|
||||||
|
uv_close((uv_handle_t*)req->data, uvFreeCb);
|
||||||
|
// taosMemoryFree(req->data);
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,9 +354,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
||||||
|
|
||||||
char* msg = (char*)pHead;
|
char* msg = (char*)pHead;
|
||||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||||
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr),
|
||||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
ntohs(pConn->locaddr.sin_port));
|
|
||||||
pHead->msgLen = htonl(len);
|
pHead->msgLen = htonl(len);
|
||||||
|
|
||||||
wb->base = msg;
|
wb->base = msg;
|
||||||
|
@ -429,11 +433,39 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
(*transAsyncHandle[msg->type])(msg, pThrd);
|
(*transAsyncHandle[msg->type])(msg, pThrd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static void uvWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
|
if (!uv_is_closing(handle)) {
|
||||||
|
uv_close(handle, NULL);
|
||||||
|
// uv_unref(handle);
|
||||||
|
tDebug("handle: %p -----test----", handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#define MAKE_VALGRIND_HAPPY(loop) \
|
||||||
|
do { \
|
||||||
|
uv_walk(loop, uvWalkCb, NULL); \
|
||||||
|
uv_run(loop, UV_RUN_DEFAULT); \
|
||||||
|
uv_loop_close(loop); \
|
||||||
|
} while (0);
|
||||||
|
|
||||||
static void uvAcceptAsyncCb(uv_async_t* async) {
|
static void uvAcceptAsyncCb(uv_async_t* async) {
|
||||||
SServerObj* srv = async->data;
|
SServerObj* srv = async->data;
|
||||||
tDebug("close server port %d", srv->port);
|
tDebug("close server port %d", srv->port);
|
||||||
uv_close((uv_handle_t*)&srv->server, NULL);
|
uv_walk(srv->loop, uvWalkCb, NULL);
|
||||||
uv_stop(srv->loop);
|
// uv_close((uv_handle_t*)async, NULL);
|
||||||
|
// uv_close((uv_handle_t*)&srv->server, NULL);
|
||||||
|
// uv_stop(srv->loop);
|
||||||
|
// uv_print_all_handles(srv->loop, stderr);
|
||||||
|
// int ref = uv_loop_alive(srv->loop);
|
||||||
|
// assert(ref == 0);
|
||||||
|
// tError("active size %d", ref);
|
||||||
|
// uv_stop(srv->loop);
|
||||||
|
// uv_run(srv->loop, UV_RUN_DEFAULT);
|
||||||
|
// fprintf(stderr, "------------------------------------");
|
||||||
|
// uv_print_all_handles(srv->loop, stderr);
|
||||||
|
|
||||||
|
// int ret = uv_loop_close(srv->loop);
|
||||||
|
// tError("(loop)->active_reqs.count: %d, ret: %d", (srv->loop)->active_reqs.count, ret);
|
||||||
|
// assert(ret == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
||||||
|
@ -455,16 +487,16 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
||||||
|
|
||||||
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
||||||
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
||||||
|
wr->data = cli;
|
||||||
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
||||||
|
|
||||||
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
|
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
|
||||||
|
|
||||||
tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
|
tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
|
||||||
|
|
||||||
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
|
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
|
||||||
} else {
|
} else {
|
||||||
uv_close((uv_handle_t*)cli, NULL);
|
uv_close((uv_handle_t*)cli, NULL);
|
||||||
taosMemoryFree(cli);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
@ -474,7 +506,10 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
tError("read error %s", uv_err_name(nread));
|
tError("read error %s", uv_err_name(nread));
|
||||||
}
|
}
|
||||||
// TODO(log other failure reason)
|
// TODO(log other failure reason)
|
||||||
// uv_close((uv_handle_t*)q, NULL);
|
tError("failed to create connect: %p", q);
|
||||||
|
taosMemoryFree(buf->base);
|
||||||
|
uv_close((uv_handle_t*)q, NULL);
|
||||||
|
// taosMemoryFree(q);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// free memory allocated by
|
// free memory allocated by
|
||||||
|
@ -650,6 +685,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
|
|
||||||
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
tTrace("work thread quit");
|
tTrace("work thread quit");
|
||||||
|
// uv_walk(thrd->loop, uvWalkCb, NULL);
|
||||||
uv_loop_close(thrd->loop);
|
uv_loop_close(thrd->loop);
|
||||||
uv_stop(thrd->loop);
|
uv_stop(thrd->loop);
|
||||||
}
|
}
|
||||||
|
@ -713,6 +749,7 @@ End:
|
||||||
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||||
thrd->quit = true;
|
thrd->quit = true;
|
||||||
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
|
// uv_walk(thrd->loop, uvWalkCb, NULL);
|
||||||
uv_loop_close(thrd->loop);
|
uv_loop_close(thrd->loop);
|
||||||
uv_stop(thrd->loop);
|
uv_stop(thrd->loop);
|
||||||
} else {
|
} else {
|
||||||
|
@ -765,8 +802,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
taosThreadJoin(pThrd->thread, NULL);
|
taosThreadJoin(pThrd->thread, NULL);
|
||||||
taosMemoryFree(pThrd->loop);
|
// MAKE_VALGRIND_HAPPY(pThrd->loop);
|
||||||
transDestroyAsyncPool(pThrd->asyncPool);
|
transDestroyAsyncPool(pThrd->asyncPool);
|
||||||
|
taosMemoryFree(pThrd->loop);
|
||||||
taosMemoryFree(pThrd);
|
taosMemoryFree(pThrd);
|
||||||
}
|
}
|
||||||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
|
@ -784,6 +822,8 @@ void transCloseServer(void* arg) {
|
||||||
uv_async_send(srv->pAcceptAsync);
|
uv_async_send(srv->pAcceptAsync);
|
||||||
taosThreadJoin(srv->thread, NULL);
|
taosThreadJoin(srv->thread, NULL);
|
||||||
|
|
||||||
|
MAKE_VALGRIND_HAPPY(srv->loop);
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
sendQuitToWorkThrd(srv->pThreadObj[i]);
|
sendQuitToWorkThrd(srv->pThreadObj[i]);
|
||||||
destroyWorkThrd(srv->pThreadObj[i]);
|
destroyWorkThrd(srv->pThreadObj[i]);
|
||||||
|
|
|
@ -15,9 +15,9 @@
|
||||||
|
|
||||||
#define __USE_XOPEN
|
#define __USE_XOPEN
|
||||||
|
|
||||||
|
#include "shellCommand.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "shell.h"
|
#include "shell.h"
|
||||||
#include "shellCommand.h"
|
|
||||||
|
|
||||||
#include <regex.h>
|
#include <regex.h>
|
||||||
|
|
||||||
|
@ -106,8 +106,7 @@ void clearLineBefore(Command *cmd) {
|
||||||
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
|
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
|
||||||
|
|
||||||
clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size);
|
clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size);
|
||||||
memmove(cmd->command, cmd->command + cmd->cursorOffset,
|
memmove(cmd->command, cmd->command + cmd->cursorOffset, cmd->commandSize - cmd->cursorOffset);
|
||||||
cmd->commandSize - cmd->cursorOffset);
|
|
||||||
cmd->commandSize -= cmd->cursorOffset;
|
cmd->commandSize -= cmd->cursorOffset;
|
||||||
cmd->cursorOffset = 0;
|
cmd->cursorOffset = 0;
|
||||||
cmd->screenOffset = 0;
|
cmd->screenOffset = 0;
|
||||||
|
|
|
@ -23,31 +23,34 @@
|
||||||
#include "shellCommand.h"
|
#include "shellCommand.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
#include "tconfig.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tconfig.h"
|
|
||||||
|
|
||||||
#include <regex.h>
|
#include <regex.h>
|
||||||
#include <wordexp.h>
|
#include <wordexp.h>
|
||||||
|
|
||||||
/**************** Global variables ****************/
|
/**************** Global variables ****************/
|
||||||
#ifdef _TD_POWER_
|
#ifdef _TD_POWER_
|
||||||
char CLIENT_VERSION[] = "Welcome to the PowerDB shell from %s, Client Version:%s\n"
|
char CLIENT_VERSION[] =
|
||||||
|
"Welcome to the PowerDB shell from %s, Client Version:%s\n"
|
||||||
"Copyright (c) 2020 by PowerDB, Inc. All rights reserved.\n\n";
|
"Copyright (c) 2020 by PowerDB, Inc. All rights reserved.\n\n";
|
||||||
char PROMPT_HEADER[] = "power> ";
|
char PROMPT_HEADER[] = "power> ";
|
||||||
|
|
||||||
char CONTINUE_PROMPT[] = " -> ";
|
char CONTINUE_PROMPT[] = " -> ";
|
||||||
int prompt_size = 7;
|
int prompt_size = 7;
|
||||||
#elif (_TD_TQ_ == true)
|
#elif (_TD_TQ_ == true)
|
||||||
char CLIENT_VERSION[] = "Welcome to the TQ shell from %s, Client Version:%s\n"
|
char CLIENT_VERSION[] =
|
||||||
|
"Welcome to the TQ shell from %s, Client Version:%s\n"
|
||||||
"Copyright (c) 2020 by TQ, Inc. All rights reserved.\n\n";
|
"Copyright (c) 2020 by TQ, Inc. All rights reserved.\n\n";
|
||||||
char PROMPT_HEADER[] = "tq> ";
|
char PROMPT_HEADER[] = "tq> ";
|
||||||
|
|
||||||
char CONTINUE_PROMPT[] = " -> ";
|
char CONTINUE_PROMPT[] = " -> ";
|
||||||
int prompt_size = 4;
|
int prompt_size = 4;
|
||||||
#else
|
#else
|
||||||
char CLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
|
char CLIENT_VERSION[] =
|
||||||
|
"Welcome to the TDengine shell from %s, Client Version:%s\n"
|
||||||
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
|
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
|
||||||
char PROMPT_HEADER[] = "taos> ";
|
char PROMPT_HEADER[] = "taos> ";
|
||||||
|
|
||||||
|
@ -521,7 +524,8 @@ static int dumpResultToFile(const char *fname, TAOS_RES *tres) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// FILE *fp = fopen(full_path.we_wordv[0], "w");
|
// FILE *fp = fopen(full_path.we_wordv[0], "w");
|
||||||
TdFilePtr pFile = taosOpenFile(full_path.we_wordv[0], TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
TdFilePtr pFile =
|
||||||
|
taosOpenFile(full_path.we_wordv[0], TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
fprintf(stderr, "ERROR: failed to open file: %s\n", full_path.we_wordv[0]);
|
fprintf(stderr, "ERROR: failed to open file: %s\n", full_path.we_wordv[0]);
|
||||||
wordfree(&full_path);
|
wordfree(&full_path);
|
||||||
|
|
|
@ -14,27 +14,20 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define __USE_XOPEN
|
#define __USE_XOPEN
|
||||||
#include "os.h"
|
|
||||||
#include "shell.h"
|
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tconfig.h"
|
|
||||||
#include "shellCommand.h"
|
#include "shellCommand.h"
|
||||||
#include "tbase64.h"
|
#include "tglobal.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "version.h"
|
|
||||||
|
|
||||||
#include <wordexp.h>
|
|
||||||
#include <argp.h>
|
#include <argp.h>
|
||||||
#include <termio.h>
|
#include <termio.h>
|
||||||
|
#include <wordexp.h>
|
||||||
|
|
||||||
#define OPT_ABORT 1 /* abort */
|
#define OPT_ABORT 1 /* abort */
|
||||||
|
|
||||||
|
|
||||||
int indicator = 1;
|
int indicator = 1;
|
||||||
|
|
||||||
void insertChar(Command *cmd, char *c, int size);
|
void insertChar(Command *cmd, char *c, int size);
|
||||||
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
|
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, int32_t pkgNum, char *pkgType);
|
||||||
int32_t pkgNum, char *pkgType);
|
|
||||||
const char *argp_program_version = version;
|
const char *argp_program_version = version;
|
||||||
const char *argp_program_bug_address = "<support@taosdata.com>";
|
const char *argp_program_bug_address = "<support@taosdata.com>";
|
||||||
static char doc[] = "";
|
static char doc[] = "";
|
||||||
|
@ -45,14 +38,14 @@ static tsem_t cancelSem;
|
||||||
|
|
||||||
static struct argp_option options[] = {
|
static struct argp_option options[] = {
|
||||||
{"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."},
|
{"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."},
|
||||||
{"password", 'p', 0, 0, "The password to use when connecting to the server."},
|
{"password", 'p', NULL, 0, "The password to use when connecting to the server."},
|
||||||
{"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."},
|
{"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."},
|
||||||
{"user", 'u', "USER", 0, "The user name to use when connecting to the server."},
|
{"user", 'u', "USER", 0, "The user name to use when connecting to the server."},
|
||||||
{"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."},
|
{"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."},
|
||||||
{"config-dir", 'c', "CONFIG_DIR", 0, "Configuration directory."},
|
{"config-dir", 'c', "CONFIG_DIR", 0, "Configuration directory."},
|
||||||
{"dump-config", 'C', 0, 0, "Dump configuration."},
|
{"dump-config",'C', NULL, 0, "Dump configuration."},
|
||||||
{"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."},
|
{"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."},
|
||||||
{"raw-time", 'r', 0, 0, "Output time as uint64_t."},
|
{"raw-time", 'r', NULL, 0, "Output time as uint64_t."},
|
||||||
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
|
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
|
||||||
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
|
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
|
||||||
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
||||||
|
@ -182,18 +175,16 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
/* Our argp parser. */
|
/* Our argp parser. */
|
||||||
static struct argp argp = {options, parse_opt, args_doc, doc};
|
static struct argp argp = {options, parse_opt, args_doc, doc};
|
||||||
|
|
||||||
char LINUXCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
|
char LINUXCLIENT_VERSION[] =
|
||||||
|
"Welcome to the TDengine shell from %s, Client Version:%s\n"
|
||||||
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
|
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
|
||||||
char g_password[SHELL_MAX_PASSWORD_LEN];
|
char g_password[SHELL_MAX_PASSWORD_LEN];
|
||||||
|
|
||||||
static void parse_args(
|
static void parse_args(int argc, char *argv[], SShellArguments *arguments) {
|
||||||
int argc, char *argv[], SShellArguments *arguments) {
|
|
||||||
for (int i = 1; i < argc; i++) {
|
for (int i = 1; i < argc; i++) {
|
||||||
if ((strncmp(argv[i], "-p", 2) == 0)
|
if ((strncmp(argv[i], "-p", 2) == 0) || (strncmp(argv[i], "--password", 10) == 0)) {
|
||||||
|| (strncmp(argv[i], "--password", 10) == 0)) {
|
|
||||||
printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info());
|
printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info());
|
||||||
if ((strlen(argv[i]) == 2)
|
if ((strlen(argv[i]) == 2) || (strncmp(argv[i], "--password", 10) == 0)) {
|
||||||
|| (strncmp(argv[i], "--password", 10) == 0)) {
|
|
||||||
printf("Enter password: ");
|
printf("Enter password: ");
|
||||||
taosSetConsoleEcho(false);
|
taosSetConsoleEcho(false);
|
||||||
if (scanf("%20s", g_password) > 1) {
|
if (scanf("%20s", g_password) > 1) {
|
||||||
|
@ -535,9 +526,7 @@ void exitShell() {
|
||||||
taos_cleanup();
|
taos_cleanup();
|
||||||
exit(EXIT_SUCCESS);
|
exit(EXIT_SUCCESS);
|
||||||
}
|
}
|
||||||
void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) {
|
void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { tsem_post(&cancelSem); }
|
||||||
tsem_post(&cancelSem);
|
|
||||||
}
|
|
||||||
|
|
||||||
void *cancelHandler(void *arg) {
|
void *cancelHandler(void *arg) {
|
||||||
setThreadName("cancelHandler");
|
setThreadName("cancelHandler");
|
||||||
|
@ -640,11 +629,11 @@ int main(int argc, char *argv[]) {
|
||||||
con = taos_connect_auth(args.host, args.user, args.auth, args.database, args.port);
|
con = taos_connect_auth(args.host, args.user, args.auth, args.database, args.port);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if (taos_init()) {
|
// if (taos_init()) {
|
||||||
printf("Failed to init taos");
|
// printf("Failed to init taos");
|
||||||
exit(EXIT_FAILURE);
|
// exit(EXIT_FAILURE);
|
||||||
}
|
// }
|
||||||
*/
|
|
||||||
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType);
|
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType);
|
||||||
taos_close(con);
|
taos_close(con);
|
||||||
exit(0);
|
exit(0);
|
||||||
|
|
Loading…
Reference in New Issue