[td-11818] merge 3.0

This commit is contained in:
Haojun Liao 2022-01-10 23:20:43 +08:00
commit a2545549bd
60 changed files with 551 additions and 532 deletions

View File

@ -133,36 +133,36 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
typedef struct SKv {
typedef struct {
int32_t keyLen;
int32_t valueLen;
void* key;
void* value;
} SKv;
typedef struct SClientHbKey {
typedef struct {
int32_t connId;
int32_t hbType;
} SClientHbKey;
typedef struct SClientHbReq {
typedef struct {
SClientHbKey connKey;
SHashObj* info; // hash<Slv.key, Sklv>
} SClientHbReq;
typedef struct SClientHbBatchReq {
typedef struct {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
typedef struct SClientHbRsp {
typedef struct {
SClientHbKey connKey;
int32_t status;
int32_t bodyLen;
void* body;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
typedef struct {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
@ -220,13 +220,13 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
return buf;
}
typedef struct SBuildTableMetaInput {
typedef struct {
int32_t vgId;
char* dbName;
char* tableFullName;
} SBuildTableMetaInput;
typedef struct SBuildUseDBInput {
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
int32_t vgVersion;
} SBuildUseDBInput;
@ -234,17 +234,12 @@ typedef struct SBuildUseDBInput {
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
typedef struct {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
} SEpAddrMsg;
typedef struct {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
} SEpAddr;
typedef struct SMsgHead {
typedef struct {
int32_t contLen;
int32_t vgId;
} SMsgHead;
@ -262,7 +257,7 @@ typedef struct SSubmitBlk {
} SSubmitBlk;
// Submit message for this TSDB
typedef struct SSubmitMsg {
typedef struct {
SMsgHead header;
int64_t version;
int32_t length;
@ -301,7 +296,7 @@ typedef struct {
int32_t failedRows; // number of failed records (exclude duplicate records)
int32_t numOfFailedBlocks;
SShellSubmitRspBlock failedBlocks[];
} SShellSubmitRspMsg;
} SShellSubmitRsp;
typedef struct SSchema {
int8_t type;
@ -310,98 +305,24 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN];
} SSchema;
typedef struct {
int32_t contLen;
int32_t vgId;
int8_t tableType;
int16_t numOfColumns;
int16_t numOfTags;
int32_t tid;
int32_t sversion;
int32_t tversion;
int32_t tagDataLen;
int32_t sqlDataLen;
uint64_t uid;
uint64_t superTableUid;
uint64_t createdTime;
char tableFname[TSDB_TABLE_FNAME_LEN];
char stbFname[TSDB_TABLE_FNAME_LEN];
char data[];
} SMDCreateTableMsg;
// typedef struct {
// int32_t len; // one create table message
// char tableName[TSDB_TABLE_FNAME_LEN];
// int16_t numOfColumns;
// int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
// int8_t igExists;
// int8_t rspMeta;
// int8_t reserved[16];
// char schema[];
//} SCreateTableMsg;
typedef struct {
char tableName[TSDB_TABLE_FNAME_LEN];
int16_t numOfColumns;
int16_t numOfTags;
int8_t igExists;
int8_t rspMeta;
char schema[];
} SCreateCTableMsg;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
int32_t numOfTags;
int32_t numOfColumns;
SSchema pSchema[];
} SCreateStbMsg, SCreateTableMsg;
} SMCreateStbReq;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
} SDropStbMsg;
} SMDropStbReq;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
SSchema schema;
} SAlterStbMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t suid;
} SVDropStbReq;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t type; /* operation type */
int32_t numOfCols; /* number of schema */
int32_t numOfTags;
char data[];
} SAlterTableMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
} SDropTableMsg;
typedef struct {
SMsgHead head;
int64_t uid;
int32_t tid;
int16_t tversion;
int16_t colId;
int8_t type;
int16_t bytes;
int32_t tagValLen;
int16_t numOfTags;
int32_t schemaLen;
char data[];
} SUpdateTableTagValMsg;
} SMAlterStbReq;
typedef struct {
int32_t pid;
@ -470,28 +391,13 @@ typedef struct {
} SCreateUserReq, SAlterUserReq;
typedef struct {
int32_t contLen;
int32_t vgId;
int32_t tid;
uint64_t uid;
char tableFname[TSDB_TABLE_FNAME_LEN];
} SMDDropTableMsg;
typedef struct {
int32_t contLen;
int32_t vgId;
uint64_t uid;
char tableFname[TSDB_TABLE_FNAME_LEN];
} SDropSTableMsg;
typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
int16_t flag; // denote if it is a tag or a normal column
char name[TSDB_DB_FNAME_LEN];
} SColIndex;
typedef struct SColumnFilterInfo {
typedef struct {
int16_t lowerRelOptr;
int16_t upperRelOptr;
int16_t filterstr; // denote if current column is char(binary/nchar)
@ -512,7 +418,7 @@ typedef struct SColumnFilterInfo {
};
} SColumnFilterInfo;
typedef struct SColumnFilterList {
typedef struct {
int16_t numOfFilters;
union {
int64_t placeholder;
@ -523,14 +429,14 @@ typedef struct SColumnFilterList {
* for client side struct, we only need the column id, type, bytes are not necessary
* But for data in vnode side, we need all the following information.
*/
typedef struct SColumnInfo {
typedef struct {
int16_t colId;
int16_t type;
int16_t bytes;
SColumnFilterList flist;
} SColumnInfo;
typedef struct STableIdInfo {
typedef struct {
uint64_t uid;
TSKEY key; // last accessed ts, for subscription
} STableIdInfo;
@ -547,7 +453,7 @@ typedef struct {
int32_t tsOrder; // ts comp block order
} STsBufInfo;
typedef struct SInterval {
typedef struct {
int32_t tz; // query client timezone
char intervalUnit;
char slidingUnit;
@ -606,7 +512,7 @@ typedef struct {
int32_t udfContentOffset;
int32_t udfContentLen;
SColumnInfo tableCols[];
} SQueryTableMsg;
} SQueryTableReq;
typedef struct {
int32_t code;
@ -623,7 +529,7 @@ typedef struct {
int8_t free;
} SRetrieveTableReq;
typedef struct SRetrieveTableRsp {
typedef struct {
int64_t useconds;
int8_t completed; // all results are returned to client
int8_t precision;
@ -655,7 +561,7 @@ typedef struct {
int8_t update;
int8_t cacheLastRow;
int8_t ignoreExist;
} SCreateDbMsg;
} SCreateDbReq;
typedef struct {
char db[TSDB_DB_FNAME_LEN];
@ -667,25 +573,25 @@ typedef struct {
int8_t walLevel;
int8_t quorum;
int8_t cacheLastRow;
} SAlterDbMsg;
} SAlterDbReq;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
} SDropDbMsg;
} SDropDbReq;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
int32_t vgVersion;
} SUseDbMsg;
} SUseDbReq;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
} SSyncDbMsg;
} SSyncDbReq;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
} SCompactDbMsg;
} SCompactDbReq;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
@ -699,16 +605,16 @@ typedef struct {
int32_t commentSize;
int32_t codeSize;
char pCont[];
} SCreateFuncMsg;
} SCreateFuncReq;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
} SDropFuncMsg;
} SDropFuncReq;
typedef struct {
int32_t numOfFuncs;
char pFuncNames[];
} SRetrieveFuncMsg;
} SRetrieveFuncReq;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
@ -768,7 +674,7 @@ typedef struct {
typedef struct {
int32_t reserved;
} STransMsg;
} STransReq;
typedef struct {
int32_t dnodeId;
@ -843,7 +749,7 @@ typedef struct {
SMsgHead header;
char dbFname[TSDB_DB_FNAME_LEN];
char tableFname[TSDB_TABLE_FNAME_LEN];
} STableInfoMsg;
} STableInfoReq;
typedef struct {
int8_t metaClone; // create local clone of the cached table meta
@ -851,7 +757,7 @@ typedef struct {
int32_t numOfTables;
int32_t numOfUdfs;
char tableNames[];
} SMultiTableInfoMsg;
} SMultiTableInfoReq;
typedef struct SVgroupInfo {
int32_t vgId;
@ -859,19 +765,19 @@ typedef struct SVgroupInfo {
uint32_t hashEnd;
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo;
typedef struct {
int32_t vgId;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SVgroupMsg;
typedef struct {
int32_t numOfVgroups;
SVgroupMsg vgroups[];
} SVgroupsMsg, SVgroupsInfo;
} SVgroupsInfo;
typedef struct {
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name
@ -888,9 +794,9 @@ typedef struct {
uint64_t tuid;
int32_t vgId;
SSchema pSchema[];
} STableMetaMsg;
} STableMetaRsp;
typedef struct SMultiTableMeta {
typedef struct {
int32_t numOfTables;
int32_t numOfVgroup;
int32_t numOfUdf;
@ -932,11 +838,11 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
int32_t numOfVgroup;
int32_t vgid[];
} SCompactMsg;
} SCompactReq;
typedef struct SShowRsp {
typedef struct {
int64_t showId;
STableMetaMsg tableMeta;
STableMetaRsp tableMeta;
} SShowRsp;
typedef struct {
@ -975,17 +881,6 @@ typedef struct {
int32_t dnodeId;
} SMCreateBnodeReq, SMDropBnodeReq, SDCreateBnodeReq, SDDropBnodeReq;
typedef struct {
int32_t dnodeId;
int32_t vgId;
int32_t tid;
} SConfigTableMsg;
typedef struct {
int32_t dnodeId;
int32_t vgId;
} SConfigVnodeMsg;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
int32_t queryId;
@ -1042,7 +937,7 @@ typedef struct {
int8_t align[7];
char name[TSDB_STEP_NAME_LEN];
char desc[TSDB_STEP_DESC_LEN];
} SStartupMsg;
} SStartupReq;
// mq related
typedef struct {
@ -1083,46 +978,6 @@ typedef struct {
} SSubmitReqReader;
typedef struct {
/* data */
} SCreateTableReq;
typedef struct {
/* data */
} SCreateTableRsp;
typedef struct {
/* data */
} SDropTableReq;
typedef struct {
/* data */
} SDropTableRsp;
typedef struct {
/* data */
} SAlterTableReq;
typedef struct {
/* data */
} SAlterTableRsp;
typedef struct {
/* data */
} SDropStableReq;
typedef struct {
/* data */
} SDropStableRsp;
typedef struct {
/* data */
} SUpdateTagValReq;
typedef struct {
/* data */
} SUpdateTagValRsp;
typedef struct SSubQueryMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
@ -1131,59 +986,59 @@ typedef struct SSubQueryMsg {
char msg[];
} SSubQueryMsg;
typedef struct SResReadyMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SResReadyMsg;
} SResReadyReq;
typedef struct SResReadyRsp {
typedef struct {
int32_t code;
} SResReadyRsp;
typedef struct SResFetchMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SResFetchMsg;
} SResFetchReq;
typedef struct SSchTasksStatusMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
} SSchTasksStatusMsg;
} SSchTasksStatusReq;
typedef struct STaskStatus {
typedef struct {
uint64_t queryId;
uint64_t taskId;
int8_t status;
} STaskStatus;
typedef struct SSchedulerStatusRsp {
typedef struct {
uint32_t num;
STaskStatus status[];
} SSchedulerStatusRsp;
typedef struct STaskCancelMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} STaskCancelMsg;
} STaskCancelReq;
typedef struct STaskCancelRsp {
typedef struct {
int32_t code;
} STaskCancelRsp;
typedef struct STaskDropMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} STaskDropMsg;
} STaskDropReq;
typedef struct STaskDropRsp {
typedef struct {
int32_t code;
} STaskDropRsp;
@ -1334,18 +1189,18 @@ typedef struct {
void* executor;
int32_t sqlLen;
char* sql;
} SCreateTopicMsg;
} SCreateTopicReq;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
} SDropTopicMsg;
} SDropTopicReq;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
SSchema schema;
} SAlterTopicMsg;
} SAlterTopicReq;
typedef struct {
SMsgHead head;
@ -1356,13 +1211,13 @@ typedef struct {
char* executor;
int32_t sqlLen;
char* sql;
} SDCreateTopicMsg;
} SDCreateTopicReq;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t tuid;
} SDDropTopicMsg;
} SDDropTopicReq;
typedef struct SVCreateTbReq {
uint64_t ver; // use a general definition
@ -1402,24 +1257,63 @@ void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq);
void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq);
typedef struct SVCreateTbRsp {
typedef struct {
SMsgHead head;
} SVCreateTbRsp;
typedef struct SVShowTablesReq {
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
} SVAlterTbReq;
typedef struct {
SMsgHead head;
} SVAlterTbRsp;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
} SVDropTbReq;
typedef struct {
SMsgHead head;
} SVDropTbRsp;
typedef struct {
SMsgHead head;
int64_t uid;
int32_t tid;
int16_t tversion;
int16_t colId;
int8_t type;
int16_t bytes;
int32_t tagValLen;
int16_t numOfTags;
int32_t schemaLen;
char data[];
} SUpdateTagValReq;
typedef struct {
SMsgHead head;
} SUpdateTagValRsp;
typedef struct {
SMsgHead head;
} SVShowTablesReq;
typedef struct SVShowTablesRsp {
typedef struct {
int64_t id;
STableMetaMsg metaInfo;
STableMetaRsp metaInfo;
} SVShowTablesRsp;
typedef struct SVShowTablesFetchReq {
typedef struct {
SMsgHead head;
int32_t id;
} SVShowTablesFetchReq;
typedef struct SVShowTablesFetchRsp {
typedef struct {
int64_t useconds;
int8_t completed; // all results are returned to client
int8_t precision;

View File

@ -40,7 +40,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskI
* @param qinfo
* @return
*/
bool qExecTask(qTaskInfo_t qTask);
bool qExecTask(qTaskInfo_t qTask, SSDataBlock** pRes);
/**
* Retrieve the produced results information, if current query is not paused or completed,

View File

@ -114,7 +114,7 @@ typedef struct SProjectPhyNode {
typedef struct SExchangePhyNode {
SPhyNode node;
uint64_t srcTemplateId; // template id of datasource suplans
SArray *pSrcEndPoints; // SEpAddrMsg, scheduler fill by calling qSetSuplanExecutionNode
SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhyNode;
typedef struct SSubplanId {

View File

@ -105,7 +105,7 @@ typedef struct SQueryNodeAddr{
int32_t nodeId; // vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);

View File

@ -26,14 +26,10 @@ extern "C" {
#define TD_MOD_UNINITIALIZED 0
#define TD_MOD_INITIALIZED 1
#define TD_MOD_UNCLEARD 0
#define TD_MOD_CLEARD 1
typedef int8_t td_mode_flag_t;
#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_INITIALIZED, TD_MOD_UNINITIALIZED)
#define TD_IS_NULL(PTR) ((PTR) == NULL)

View File

@ -130,7 +130,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SShowRsp* pShow = (SShowRsp *)pMsg->pData;
pShow->showId = htobe64(pShow->showId);
STableMetaMsg *pMetaMsg = &(pShow->tableMeta);
STableMetaRsp *pMetaMsg = &(pShow->tableMeta);
pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);
SSchema* pSchema = pMetaMsg->pSchema;

View File

@ -167,7 +167,7 @@ typedef struct SDnode {
SBnodeMgmt bmgmt;
SVnodesMgmt vmgmt;
STransMgmt tmgmt;
SStartupMsg startup;
SStartupReq startup;
} SDnode;
EStat dndGetStat(SDnode *pDnode);
@ -175,7 +175,7 @@ void dndSetStat(SDnode *pDnode, EStat stat);
char *dndStatStr(EStat stat);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
#ifdef __cplusplus
}

View File

@ -21,8 +21,9 @@ extern "C" {
#endif
#include "dndInt.h"
int32_t dndInitDnode(SDnode *pDnode);
void dndCleanupDnode(SDnode *pDnode);
int32_t dndInitMgmt(SDnode *pDnode);
void dndStopMgmt(SDnode *pDnode);
void dndCleanupMgmt(SDnode *pDnode);
int32_t dndGetDnodeId(SDnode *pDnode);
int64_t dndGetClusterId(SDnode *pDnode);

View File

@ -473,12 +473,12 @@ static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)};
rpcSendResponse(&rpcRsp);
}
@ -497,7 +497,7 @@ static void *dnodeThreadRoutine(void *param) {
}
}
int32_t dndInitDnode(SDnode *pDnode) {
int32_t dndInitMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->dnodeId = 0;
@ -547,16 +547,18 @@ int32_t dndInitDnode(SDnode *pDnode) {
return 0;
}
void dndCleanupDnode(SDnode *pDnode) {
void dndStopMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupWorker(&pMgmt->mgmtWorker);
if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId);
pMgmt->threadId = NULL;
}
}
void dndCleanupMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosWLockLatch(&pMgmt->latch);
if (pMgmt->dnodeEps != NULL) {

View File

@ -167,6 +167,11 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
dDebug("vgId:%d, vnode is closed", pVnode->vgId);
if (pVnode->dropped) {
dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped);
vnodeDestroy(pVnode->path);
}
free(pVnode->path);
free(pVnode->db);
free(pVnode);
@ -466,6 +471,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
}
static void dndCloseVnodes(SDnode *pDnode) {
dInfo("start to close all vnodes");
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
int32_t numOfVnodes = 0;
@ -658,8 +664,6 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
dndCloseVnode(pDnode, pVnode);
vnodeClose(pVnode->pImpl);
vnodeDestroy(pVnode->path);
dndWriteVnodesToFile(pDnode);
return 0;

View File

@ -46,14 +46,14 @@ char *dndStatStr(EStat stat) {
}
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc) {
SStartupMsg *pStartup = &pDnode->startup;
SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
pStartup->finished = 0;
}
void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupMsg));
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
pStartup->finished = (dndGetStat(pDnode) == DND_STAT_RUNNING);
}
@ -202,7 +202,7 @@ SDnode *dndInit(SDnodeOpt *pOption) {
return NULL;
}
if (dndInitDnode(pDnode) != 0) {
if (dndInitMgmt(pDnode) != 0) {
dError("failed to init dnode");
dndCleanup(pDnode);
return NULL;
@ -263,12 +263,13 @@ void dndCleanup(SDnode *pDnode) {
dInfo("start to cleanup TDengine");
dndSetStat(pDnode, DND_STAT_STOPPED);
dndCleanupTrans(pDnode);
dndStopMgmt(pDnode);
dndCleanupMnode(pDnode);
dndCleanupBnode(pDnode);
dndCleanupSnode(pDnode);
dndCleanupQnode(pDnode);
dndCleanupVnodes(pDnode);
dndCleanupDnode(pDnode);
dndCleanupMgmt(pDnode);
vnodeClear();
tfsDestroy();
walCleanUp();

View File

@ -53,7 +53,7 @@ class Testbase {
void SendShowMetaReq(int8_t showType, const char* db);
void SendShowRetrieveReq();
STableMetaMsg* GetShowMeta();
STableMetaRsp* GetShowMeta();
SRetrieveTableRsp* GetRetrieveRsp();
int32_t GetMetaNum();
@ -74,7 +74,7 @@ class Testbase {
private:
int64_t showId;
STableMetaMsg* pMeta;
STableMetaRsp* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;

View File

@ -179,6 +179,6 @@ const char* Testbase::GetShowBinary(int32_t len) {
int32_t Testbase::GetShowRows() { return pRetrieveRsp->numOfRows; }
STableMetaMsg* Testbase::GetShowMeta() { return pMeta; }
STableMetaRsp* Testbase::GetShowMeta() { return pMeta; }
SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; }

View File

@ -32,7 +32,7 @@ extern "C" {
typedef int32_t (*MndMsgFp)(SMnodeMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
typedef int32_t (*ShowRetrieveFp)(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);

View File

@ -31,7 +31,7 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessDropBnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessCreateBnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropBnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveBnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter);
@ -391,7 +391,7 @@ static int32_t mndProcessDropBnodeRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -26,7 +26,7 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster);
static int32_t mndCreateDefaultCluster(SMnode *pMnode);
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
@ -163,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;

View File

@ -39,7 +39,7 @@ static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
@ -395,7 +395,7 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);
@ -417,9 +417,9 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
taosRLockLatch(&pConsumer->lock);
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema);
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
@ -483,7 +483,7 @@ static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumO
return 0;
}
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -35,7 +35,7 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq);
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq);
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq);
static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq);
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
@ -194,7 +194,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
SDbObj *mndAcquireDb(SMnode *pMnode, char *db) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = sdbAcquire(pSdb, SDB_DB, db);
if (pDb == NULL) {
if (pDb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
}
return pDb;
@ -378,7 +378,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbMsg *pCreate, SUserObj *pUser) {
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreate, SUserObj *pUser) {
SDbObj dbObj = {0};
memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN);
memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
@ -449,7 +449,7 @@ CREATE_DB_OVER:
static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SCreateDbMsg *pCreate = pReq->rpcMsg.pCont;
SCreateDbReq *pCreate = pReq->rpcMsg.pCont;
pCreate->numOfVgroups = htonl(pCreate->numOfVgroups);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
@ -495,7 +495,7 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) {
static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbReq *pAlter) {
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
if (pAlter->totalBlocks >= 0 && pAlter->totalBlocks != pDb->cfg.totalBlocks) {
@ -646,7 +646,7 @@ UPDATE_DB_OVER:
static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SAlterDbMsg *pAlter = pReq->rpcMsg.pCont;
SAlterDbReq *pAlter = pReq->rpcMsg.pCont;
pAlter->totalBlocks = htonl(pAlter->totalBlocks);
pAlter->daysToKeep0 = htonl(pAlter->daysToKeep0);
pAlter->daysToKeep1 = htonl(pAlter->daysToKeep1);
@ -793,7 +793,7 @@ DROP_DB_OVER:
static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SDropDbMsg *pDrop = pReq->rpcMsg.pCont;
SDropDbReq *pDrop = pReq->rpcMsg.pCont;
mDebug("db:%s, start to drop", pDrop->db);
@ -823,7 +823,7 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
SUseDbMsg *pUse = pReq->rpcMsg.pCont;
SUseDbReq *pUse = pReq->rpcMsg.pCont;
pUse->vgVersion = htonl(pUse->vgVersion);
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
@ -836,6 +836,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
SUseDbRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -857,7 +858,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
pInfo->numOfEps = pVgroup->replica;
for (int32_t gid = 0; gid < pVgroup->replica; ++gid) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[gid];
SEpAddrMsg *pEpArrr = &pInfo->epAddr[gid];
SEpAddr *pEpArrr = &pInfo->epAddr[gid];
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode != NULL) {
memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
@ -890,7 +891,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SSyncDbMsg *pSync = pReq->rpcMsg.pCont;
SSyncDbReq *pSync = pReq->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pSync->db);
if (pDb == NULL) {
mError("db:%s, failed to process sync db req since %s", pSync->db, terrstr());
@ -903,7 +904,7 @@ static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SCompactDbMsg *pCompact = pReq->rpcMsg.pCont;
SCompactDbReq *pCompact = pReq->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db);
if (pDb == NULL) {
mError("db:%s, failed to process compact db req since %s", pCompact->db, terrstr());
@ -914,7 +915,7 @@ static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) {
return 0;
}
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -53,10 +53,10 @@ static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessStatusReq(SMnodeMsg *pReq);
static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
@ -582,7 +582,7 @@ static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp) {
mInfo("app:%p config rsp from dnode", pRsp->rpcMsg.ahandle);
}
static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
@ -656,7 +656,7 @@ static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data,
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -26,12 +26,12 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc);
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pCreate);
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pCreate);
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc);
static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg);
static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg);
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveFuncs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter);
@ -156,7 +156,7 @@ static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNe
return 0;
}
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pCreate) {
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pCreate) {
SFuncObj *pFunc = calloc(1, sizeof(SFuncObj) + pCreate->commentSize + pCreate->codeSize);
pFunc->createdTime = taosGetTimestampMs();
pFunc->funcType = pCreate->funcType;
@ -264,7 +264,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) {
static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCreateFuncMsg *pCreate = pMsg->rpcMsg.pCont;
SCreateFuncReq *pCreate = pMsg->rpcMsg.pCont;
pCreate->outputLen = htonl(pCreate->outputLen);
pCreate->bufSize = htonl(pCreate->bufSize);
pCreate->sigature = htobe64(pCreate->sigature);
@ -323,7 +323,7 @@ static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SDropFuncMsg *pDrop = pMsg->rpcMsg.pCont;
SDropFuncReq *pDrop = pMsg->rpcMsg.pCont;
mDebug("func:%s, start to drop", pDrop->name);
@ -353,7 +353,7 @@ static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SRetrieveFuncMsg *pRetrieve = pMsg->rpcMsg.pCont;
SRetrieveFuncReq *pRetrieve = pMsg->rpcMsg.pCont;
pRetrieve->numOfFuncs = htonl(pRetrieve->numOfFuncs);
int32_t size = sizeof(SRetrieveFuncRsp) + (sizeof(SFuncInfo) + TSDB_FUNC_CODE_LEN) * pRetrieve->numOfFuncs + 16384;
@ -395,7 +395,7 @@ static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -33,7 +33,7 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
@ -579,7 +579,7 @@ static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -51,9 +51,9 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t mndProcessConnectReq(SMnodeMsg *pReq);
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq);
static int32_t mndProcessKillConnReq(SMnodeMsg *pReq);
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
@ -389,7 +389,7 @@ static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
}
}
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
@ -518,7 +518,7 @@ static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
return numOfRows;
}
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;

View File

@ -31,7 +31,7 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessDropQnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveQnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
@ -391,7 +391,7 @@ static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -31,7 +31,7 @@ static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessDropSnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessCreateSnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropSnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveSnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter);
@ -393,7 +393,7 @@ static int32_t mndProcessDropSnodeRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -32,14 +32,14 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb);
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg);
static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg);
static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg);
static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg);
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
@ -52,9 +52,9 @@ int32_t mndInitStb(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcesSMCreateStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcesSMAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcesSMDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp);
@ -264,10 +264,10 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
return buf;
}
static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SVDropStbReq);
static SVDropTbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SVDropTbReq);
SVDropStbReq *pDrop = calloc(1, contLen);
SVDropTbReq *pDrop = calloc(1, contLen);
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
@ -276,12 +276,12 @@ static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj
pDrop->head.contLen = htonl(contLen);
pDrop->head.vgId = htonl(pVgroup->vgId);
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
pDrop->suid = htobe64(pStb->uid);
// pDrop->suid = htobe64(pStb->uid);
return pDrop;
}
static int32_t mndCheckCreateStbMsg(SCreateStbMsg *pCreate) {
static int32_t mndCheckCreateStbMsg(SMCreateStbReq *pCreate) {
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
pCreate->numOfTags = htonl(pCreate->numOfTags);
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
@ -398,7 +398,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
SVDropStbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
SVDropTbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
@ -409,7 +409,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pMsg;
action.contLen = sizeof(SVDropStbReq);
action.contLen = sizeof(SVDropTbReq);
action.msgType = TDMT_VND_DROP_STB;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg);
@ -423,7 +423,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) {
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0};
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
@ -494,9 +494,9 @@ CREATE_STB_OVER:
return code;
}
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCreateStbMsg *pCreate = pMsg->rpcMsg.pCont;
SMCreateStbReq *pCreate = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to create", pCreate->name);
@ -551,7 +551,7 @@ static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) {
SSchema *pSchema = &pAlter->schema;
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
@ -578,9 +578,9 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; }
static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SAlterStbMsg *pAlter = pMsg->rpcMsg.pCont;
SMAlterStbReq *pAlter = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to alter", pAlter->name);
@ -692,9 +692,9 @@ DROP_STB_OVER:
return 0;
}
static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SDropStbMsg *pDrop = pMsg->rpcMsg.pCont;
SMDropStbReq *pDrop = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to drop", pDrop->name);
@ -729,7 +729,7 @@ static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
@ -750,9 +750,9 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
taosRLockLatch(&pStb->lock);
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema);
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
@ -818,7 +818,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
return 0;
}
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -35,7 +35,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
@ -186,10 +186,10 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
return mndAcquireDb(pMnode, db);
}
static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicMsg);
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicReq);
SDDropTopicMsg *pDrop = calloc(1, contLen);
SDDropTopicReq *pDrop = calloc(1, contLen);
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
@ -274,7 +274,7 @@ static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SDropTopicMsg *pDrop = pMsg->rpcMsg.pCont;
SDropTopicReq *pDrop = pMsg->rpcMsg.pCont;
mDebug("topic:%s, start to drop", pDrop->name);
@ -309,7 +309,7 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) {
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("topic:%s, start to retrieve meta", pInfo->tableFname);
@ -331,9 +331,9 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
taosRLockLatch(&pTopic->lock);
int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags;
int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema);
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pTopic->lock);
mndReleaseDb(pMnode, pDb);
@ -397,7 +397,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
return 0;
}
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -32,7 +32,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass
static int32_t mndProcessCreateUserReq(SMnodeMsg *pReq);
static int32_t mndProcessAlterUserReq(SMnodeMsg *pReq);
static int32_t mndProcessDropUserReq(SMnodeMsg *pReq);
static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveUsers(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
@ -432,7 +432,7 @@ static int32_t mndProcessDropUserReq(SMnodeMsg *pReq) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -27,19 +27,19 @@
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveVgroups(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVgroups(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
int32_t mndInitVgroup(SMnode *pMnode) {
@ -164,14 +164,14 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
return 0;
}
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) {
mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup);
pOldVgroup->updateTime = pNewVgroup->updateTime;
pOldVgroup->version = pNewVgroup->version;
pOldVgroup->hashBegin = pNewVgroup->hashBegin;
pOldVgroup->hashEnd = pNewVgroup->hashEnd;
pOldVgroup->replica = pNewVgroup->replica;
memcpy(pOldVgroup->vnodeGid, pNewVgroup->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
pOld->hashBegin = pNew->hashBegin;
pOld->hashEnd = pNew->hashEnd;
pOld->replica = pNew->replica;
memcpy(pOld->vnodeGid, pNew->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
return 0;
}
@ -427,24 +427,24 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
return epset;
}
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pRsp) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pRsp) { return 0; }
static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj;
@ -475,8 +475,8 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
return 0;
}
static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetVgroupMaxReplica(pMnode, pShow->db, &pShow->replica, &pShow->numOfRows) != 0) {
@ -526,8 +526,8 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
return 0;
}
static int32_t mndRetrieveVgroups(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndRetrieveVgroups(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
@ -593,8 +593,8 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
return numOfVnodes;
}
static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
@ -633,8 +633,8 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
return 0;
}
static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;

View File

@ -61,8 +61,8 @@ void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) {
static void mndTransReExecute(void *param, void *tmrId) {
SMnode *pMnode = param;
if (mndIsMaster(pMnode)) {
STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg));
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)};
STransReq *pMsg = rpcMallocCont(sizeof(STransReq));
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransReq)};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
}

View File

@ -64,9 +64,9 @@ TEST_F(MndTestDb, 01_ShowDb) {
TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
{
int32_t contLen = sizeof(SCreateDbMsg);
int32_t contLen = sizeof(SCreateDbReq);
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen);
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
@ -136,9 +136,9 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
CheckBinary("master", 9);
{
int32_t contLen = sizeof(SAlterDbMsg);
int32_t contLen = sizeof(SAlterDbReq);
SAlterDbMsg* pReq = (SAlterDbMsg*)rpcMallocCont(contLen);
SAlterDbReq* pReq = (SAlterDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
pReq->totalBlocks = htonl(12);
pReq->daysToKeep0 = htonl(300);
@ -205,9 +205,9 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
CheckInt8(0); // update
{
int32_t contLen = sizeof(SDropDbMsg);
int32_t contLen = sizeof(SDropDbReq);
SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen);
SDropDbReq* pReq = (SDropDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
@ -224,9 +224,9 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
{
int32_t contLen = sizeof(SCreateDbMsg);
int32_t contLen = sizeof(SCreateDbReq);
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen);
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d2");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
@ -261,9 +261,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
CheckBinary("d2", TSDB_DB_NAME_LEN - 1);
{
int32_t contLen = sizeof(SUseDbMsg);
int32_t contLen = sizeof(SUseDbReq);
SUseDbMsg* pReq = (SUseDbMsg*)rpcMallocCont(contLen);
SUseDbReq* pReq = (SUseDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d2");
pReq->vgVersion = htonl(-1);
@ -290,7 +290,7 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1);
EXPECT_EQ(pInfo->inUse, 0);
EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
SEpAddr* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost");
@ -306,7 +306,7 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX);
EXPECT_EQ(pInfo->inUse, 0);
EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
SEpAddr* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost");
@ -314,9 +314,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
}
{
int32_t contLen = sizeof(SDropDbMsg);
int32_t contLen = sizeof(SDropDbReq);
SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen);
SDropDbReq* pReq = (SDropDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d2");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);

View File

@ -63,7 +63,7 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
STableMetaMsg* pMeta = test.GetShowMeta();
STableMetaRsp* pMeta = test.GetShowMeta();
EXPECT_STREQ(pMeta->tbFname, "show connections");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, 7);

View File

@ -27,9 +27,9 @@ Testbase MndTestStb::test;
TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
{
int32_t contLen = sizeof(SCreateDbMsg);
int32_t contLen = sizeof(SCreateDbReq);
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen);
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
@ -59,9 +59,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
{
int32_t cols = 2;
int32_t tags = 3;
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SCreateStbMsg);
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq);
SCreateStbMsg* pReq = (SCreateStbMsg*)rpcMallocCont(contLen);
SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "1.d1.stb");
pReq->numOfTags = htonl(tags);
pReq->numOfColumns = htonl(cols);
@ -123,16 +123,16 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
// ----- meta ------
{
int32_t contLen = sizeof(STableInfoMsg);
int32_t contLen = sizeof(STableInfoReq);
STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(contLen);
STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
strcpy(pReq->tableFname, "1.d1.stb");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
STableMetaMsg* pRsp = (STableMetaMsg*)pMsg->pCont;
STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont;
pRsp->numOfTags = htonl(pRsp->numOfTags);
pRsp->numOfColumns = htonl(pRsp->numOfColumns);
pRsp->sversion = htonl(pRsp->sversion);
@ -214,9 +214,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
CheckInt32(3);
{
int32_t contLen = sizeof(SDropStbMsg);
int32_t contLen = sizeof(SMDropStbReq);
SDropStbMsg* pReq = (SDropStbMsg*)rpcMallocCont(contLen);
SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "1.d1.stb");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);

View File

@ -49,7 +49,6 @@ typedef struct SVnodeTask {
typedef struct SVnodeMgr {
td_mode_flag_t vnodeInitFlag;
td_mode_flag_t vnodeClearFlag;
// For commit
bool stop;
uint16_t nthreads;

View File

@ -22,9 +22,9 @@
extern "C" {
#endif
// SVDropTableReq
// int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq);
// void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq);
// SVDropTbReq
// int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq);
// void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq);
#ifdef __cplusplus
}

View File

@ -40,6 +40,7 @@ int vnodeAsyncCommit(SVnode *pVnode) {
int vnodeSyncCommit(SVnode *pVnode) {
vnodeAsyncCommit(pVnode);
vnodeWaitCommit(pVnode);
tsem_post(&(pVnode->canCommit));
return 0;
}

View File

@ -15,7 +15,7 @@
#include "vnodeDef.h"
SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED, .vnodeClearFlag = TD_MOD_UNCLEARD, .stop = false};
SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED};
static void* loop(void* arg);
@ -24,6 +24,8 @@ int vnodeInit(uint16_t nthreads) {
return 0;
}
vnodeMgr.stop = false;
// Start commit handers
if (nthreads > 0) {
vnodeMgr.nthreads = nthreads;
@ -38,6 +40,7 @@ int vnodeInit(uint16_t nthreads) {
for (uint16_t i = 0; i < nthreads; i++) {
pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
}
} else {
// TODO: if no commit thread is set, then another mechanism should be
@ -53,7 +56,7 @@ int vnodeInit(uint16_t nthreads) {
}
void vnodeClear() {
if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeClearFlag)) == TD_MOD_CLEARD) {
if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_UNINITIALIZED) {
return;
}

View File

@ -53,14 +53,14 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
STableInfoMsg * pReq = (STableInfoMsg *)(pMsg->pCont);
STableInfoReq * pReq = (STableInfoReq *)(pMsg->pCont);
STbCfg * pTbCfg = NULL;
STbCfg * pStbCfg = NULL;
tb_uid_t uid;
int32_t nCols;
int32_t nTagCols;
SSchemaWrapper *pSW;
STableMetaMsg * pTbMetaMsg = NULL;
STableMetaRsp * pTbMetaMsg = NULL;
SSchema * pTagSchema;
SRpcMsg rpcMsg;
int msgLen = 0;
@ -94,8 +94,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pTagSchema = NULL;
}
msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols);
pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen);
msgLen = sizeof(STableMetaRsp) + sizeof(SSchema) * (nCols + nTagCols);
pTbMetaMsg = (STableMetaRsp *)rpcMallocCont(msgLen);
if (pTbMetaMsg == NULL) {
goto _exit;
}
@ -167,7 +167,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
// SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen);
memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen);
memset(pFetchRsp, 0, sizeof(SVShowTablesFetchRsp) + payloadLen);
char *p = pFetchRsp->data;
for (int32_t i = 0; i < numOfTables; ++i) {

View File

@ -108,12 +108,12 @@ static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq) {
return buf;
}
int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq) {
int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq) {
// TODO
return 0;
}
void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq) {
void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq) {
// TODO
}
#endif

View File

@ -54,7 +54,7 @@ typedef struct STsdbMemTable {
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb);
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp);
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);

View File

@ -73,7 +73,7 @@ void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) {
}
}
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp) {
SSubmitBlk * pBlock = NULL;
SSubmitMsgIter msgIter = {0};
int32_t affectedrows = 0, numOfRows = 0;

View File

@ -68,7 +68,7 @@ char *ctgTestSTablename = "stable1";
void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(sizeof(SCreateDbReq));
strcpy(pReq->db, "1.db1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
@ -92,7 +92,7 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDbMsg);
rpcMsg.contLen = sizeof(SCreateDbReq);
rpcMsg.msgType = TDMT_MND_CREATE_DB;
SRpcMsg rpcRsp = {0};
@ -200,7 +200,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
vgInfo.numOfEps = i % TSDB_MAX_REPLICA + 1;
vgInfo.inUse = i % vgInfo.numOfEps;
for (int32_t n = 0; n < vgInfo.numOfEps; ++n) {
SEpAddrMsg *addr = &vgInfo.epAddr[n];
SEpAddr *addr = &vgInfo.epAddr[n];
strcpy(addr->fqdn, "a0");
addr->port = htons(n + 22);
}
@ -234,7 +234,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
vg->numOfEps = i % TSDB_MAX_REPLICA + 1;
vg->inUse = i % vg->numOfEps;
for (int32_t n = 0; n < vg->numOfEps; ++n) {
SEpAddrMsg *addr = &vg->epAddr[n];
SEpAddr *addr = &vg->epAddr[n];
strcpy(addr->fqdn, "a0");
addr->port = htons(n + 22);
}
@ -249,12 +249,12 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
STableMetaRsp *rspMsg = NULL; //todo
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename);
rspMsg->numOfTags = 0;
rspMsg->numOfColumns = htonl(ctgTestColNum);
@ -285,12 +285,12 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
STableMetaRsp *rspMsg = NULL; //todo
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum);
@ -329,12 +329,12 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
STableMetaRsp *rspMsg = NULL; //todo
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum);
@ -372,13 +372,13 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
}
void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
STableMetaRsp *rspMsg = NULL; //todo
static int32_t idx = 1;
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
rspMsg->numOfTags = htonl(ctgTestTagNum);

View File

@ -589,17 +589,17 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void freeParam(STaskParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param);
int32_t convertQueryMsg(SQueryTableReq *pQueryMsg, STaskParam* param);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo);
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
SSqlExpr **pExpr, SExprInfo *prevExpr, struct SUdfInfo *pUdfInfo);
int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters);
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableReq *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,

View File

@ -134,8 +134,8 @@ int waitMoment(SQInfo* pQInfo){
}
#endif
bool qExecTask(qTaskInfo_t qinfo) {
SExecTaskInfo * pTaskInfo = (SExecTaskInfo *)qinfo;
bool qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *) tinfo;
int64_t threadId = taosGetSelfPthreadId();
int64_t curOwner = 0;
@ -176,12 +176,10 @@ bool qExecTask(qTaskInfo_t qinfo) {
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = taosGetTimestampUs();
void* pOutput = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
*pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
// todo put the result into sink node.
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (isTaskKilled(pTaskInfo)) {

View File

@ -2457,7 +2457,7 @@ static void doUpdateLastKey(STaskAttr* pQueryAttr) {
}
}
static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool stableQuery) {
static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
// in case of point-interpolation query, use asc order scan
@ -7061,7 +7061,7 @@ bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SCol
return j != INT32_MIN;
}
static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) {
static bool validateQueryMsg(SQueryTableReq *pQueryMsg) {
if (pQueryMsg->interval.interval < 0) {
//qError("qmsg:%p illegal value of interval time %" PRId64, pQueryMsg, pQueryMsg->interval.interval);
return false;
@ -7127,7 +7127,7 @@ static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pEx
return true;
}
static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) {
static char *createTableIdList(SQueryTableReq *pQueryMsg, char *pMsg, SArray **pTableIdList) {
assert(pQueryMsg->numOfTables > 0);
*pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo));
@ -7231,7 +7231,7 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
* @param pExpr
* @return
*/
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param) {
int32_t convertQueryMsg(SQueryTableReq *pQueryMsg, STaskParam* param) {
int32_t code = TSDB_CODE_SUCCESS;
// if (taosCheckVersion(pQueryMsg->version, version, 3) != 0) {
@ -7806,7 +7806,7 @@ int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters) {
// todo refactor
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExpr, SExprInfo* prevExpr, struct SUdfInfo *pUdfInfo) {
// *pExprInfo = NULL;
// int32_t code = TSDB_CODE_SUCCESS;
@ -7864,7 +7864,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t nu
return TSDB_CODE_SUCCESS;
}
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) {
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code) {
if (pQueryMsg->numOfGroupCols == 0) {
return NULL;
}
@ -8032,7 +8032,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return ((SQInfo *)qHandle)->qId == qId;
}
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SQInfo* createQInfoImpl(SQueryTableReq* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId,
char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo) {
int16_t numOfCols = pQueryMsg->numOfCols;

View File

@ -9,9 +9,9 @@ SCreateUserReq* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen);
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);

View File

@ -112,7 +112,7 @@ SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf,
return pShowMsg;
}
static int32_t setKeepOption(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb, SMsgBuf* pMsgBuf) {
static int32_t setKeepOption(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid number of keep options";
const char* msg2 = "invalid keep value";
const char* msg3 = "invalid keep value, should be keep0 <= keep1 <= keep2";
@ -151,7 +151,7 @@ static int32_t setKeepOption(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb,
return TSDB_CODE_SUCCESS;
}
static int32_t setTimePrecision(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDbInfo, SMsgBuf* pMsgBuf) {
static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDbInfo, SMsgBuf* pMsgBuf) {
const char* msg = "invalid time precision";
pMsg->precision = TSDB_TIME_PRECISION_MILLI; // millisecond by default
@ -178,7 +178,7 @@ static int32_t setTimePrecision(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreate
return TSDB_CODE_SUCCESS;
}
static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) {
static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->cacheBlockSize = htonl(pCreateDb->cacheBlockSize);
pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks);
pMsg->daysPerFile = htonl(pCreateDb->daysPerFile);
@ -196,7 +196,7 @@ static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->numOfVgroups = htonl(pCreateDb->numOfVgroups);
}
int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
doSetDbOptions(pCreateDbMsg, pCreateDbSql);
if (setKeepOption(pCreateDbMsg, pCreateDbSql, pMsgBuf) != TSDB_CODE_SUCCESS) {
@ -210,8 +210,8 @@ int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbS
return TSDB_CODE_SUCCESS;
}
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) {
SCreateDbMsg* pCreateMsg = calloc(1, sizeof(SCreateDbMsg));
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) {
SCreateDbReq* pCreateMsg = calloc(1, sizeof(SCreateDbReq));
if (setDbOptions(pCreateMsg, pCreateDbInfo, pMsgBuf) != TSDB_CODE_SUCCESS) {
tfree(pCreateMsg);
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
@ -230,7 +230,7 @@ SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCt
return pCreateMsg;
}
SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SSchema* pSchema;
int32_t numOfTags = 0;
@ -239,7 +239,7 @@ SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len,
numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
}
SCreateStbMsg* pCreateStbMsg = (SCreateStbMsg*)calloc(1, sizeof(SCreateStbMsg) + (numOfCols + numOfTags) * sizeof(SSchema));
SMCreateStbReq* pCreateStbMsg = (SMCreateStbReq*)calloc(1, sizeof(SMCreateStbReq) + (numOfCols + numOfTags) * sizeof(SSchema));
char* pMsg = NULL;
#if 0
@ -315,7 +315,7 @@ SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len,
return pCreateStbMsg;
}
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0);
SName name = {0};
@ -325,13 +325,13 @@ SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* p
return NULL;
}
SDropStbMsg *pDropTableMsg = (SDropStbMsg*) calloc(1, sizeof(SDropStbMsg));
SMDropStbReq *pDropTableMsg = (SMDropStbReq*) calloc(1, sizeof(SMDropStbReq));
code = tNameExtractFullName(&name, pDropTableMsg->name);
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
*len = sizeof(SDropStbMsg);
*len = sizeof(SMDropStbReq);
return pDropTableMsg;
}

View File

@ -117,7 +117,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
}
// can only perform the parameters based on the macro definitation
static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
static int32_t doCheckDbOptions(SCreateDbReq* pCreate, SMsgBuf* pMsgBuf) {
char msg[512] = {0};
if (pCreate->walLevel != -1 && (pCreate->walLevel < TSDB_MIN_WAL_LEVEL || pCreate->walLevel > TSDB_MAX_WAL_LEVEL)) {
@ -842,11 +842,11 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
goto _error;
}
SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg));
SUseDbReq* pUseDbMsg = (SUseDbReq*)calloc(1, sizeof(SUseDbReq));
tNameExtractFullName(&n, pUseDbMsg->db);
pDcl->pMsg = (char*)pUseDbMsg;
pDcl->msgLen = sizeof(SUseDbMsg);
pDcl->msgLen = sizeof(SUseDbReq);
pDcl->msgType = TDMT_MND_USE_DB;
break;
}
@ -870,14 +870,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
goto _error;
}
SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf);
SCreateDbReq* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf);
if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
}
pDcl->pMsg = (char*)pCreateMsg;
pDcl->msgLen = sizeof(SCreateDbMsg);
pDcl->msgLen = sizeof(SCreateDbReq);
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB) ? TDMT_MND_CREATE_DB : TDMT_MND_ALTER_DB;
break;
}
@ -895,14 +895,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
goto _error;
}
SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg));
SDropDbReq* pDropDbMsg = (SDropDbReq*)calloc(1, sizeof(SDropDbReq));
code = tNameExtractFullName(&name, pDropDbMsg->db);
pDropDbMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_DB_NAME_T);
pDcl->msgType = TDMT_MND_DROP_DB;
pDcl->msgLen = sizeof(SDropDbMsg);
pDcl->msgLen = sizeof(SDropDbReq);
pDcl->pMsg = (char*)pDropDbMsg;
break;
}

View File

@ -202,6 +202,30 @@ static bool schemaFromJson(const cJSON* json, void* obj) {
return true;
}
static const char* jkDataBlockSchemaSlotSchema = "SlotSchema";
static const char* jkDataBlockSchemaResultRowSize = "resultRowSize";
static const char* jkDataBlockSchemaPrecision = "Precision";
static bool dataBlockSchemaToJson(const void* obj, cJSON* json) {
const SDataBlockSchema* schema = (const SDataBlockSchema*)obj;
bool res = addRawArray(json, jkDataBlockSchemaSlotSchema, schemaToJson, schema->pSchema, sizeof(SSlotSchema), schema->numOfCols);
if (res) {
res = cJSON_AddNumberToObject(json, jkDataBlockSchemaResultRowSize, schema->resultRowSize);
}
if (res) {
res = cJSON_AddNumberToObject(json, jkDataBlockSchemaPrecision, schema->precision);
}
return res;
}
static bool dataBlockSchemaFromJson(const cJSON* json, void* obj) {
SDataBlockSchema* schema = (SDataBlockSchema*)obj;
schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize);
schema->precision = getNumber(json, jkDataBlockSchemaPrecision);
return fromRawArray(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**) &(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols);
}
static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr";
static const char* jkColumnFilterInfoUpperRelOptr = "UpperRelOptr";
static const char* jkColumnFilterInfoFilterstr = "Filterstr";
@ -561,7 +585,7 @@ static const char* jkEpAddrFqdn = "Fqdn";
static const char* jkEpAddrPort = "Port";
static bool epAddrToJson(const void* obj, cJSON* json) {
const SEpAddrMsg* ep = (const SEpAddrMsg*)obj;
const SEpAddr* ep = (const SEpAddr*)obj;
bool res = cJSON_AddStringToObject(json, jkEpAddrFqdn, ep->fqdn);
if (res) {
res = cJSON_AddNumberToObject(json, jkEpAddrPort, ep->port);
@ -570,7 +594,7 @@ static bool epAddrToJson(const void* obj, cJSON* json) {
}
static bool epAddrFromJson(const cJSON* json, void* obj) {
SEpAddrMsg* ep = (SEpAddrMsg*)obj;
SEpAddr* ep = (SEpAddr*)obj;
copyString(json, jkEpAddrFqdn, ep->fqdn);
ep->port = getNumber(json, jkEpAddrPort);
return true;
@ -587,7 +611,7 @@ static bool nodeAddrToJson(const void* obj, cJSON* json) {
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse);
}
if (res) {
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddrMsg));
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddr));
}
return res;
}
@ -597,7 +621,7 @@ static bool nodeAddrFromJson(const cJSON* json, void* obj) {
ep->nodeId = getNumber(json, jkNodeAddrId);
ep->inUse = getNumber(json, jkNodeAddrInUse);
int32_t numOfEps = 0;
bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddrMsg), &numOfEps);
bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddr), &numOfEps);
ep->numOfEps = numOfEps;
return res;
}
@ -712,7 +736,7 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) {
res = addArray(jNode, jkPnodeConditions, exprInfoToJson, phyNode->pConditions);
}
if (res) {
res = addRawArray(jNode, jkPnodeSchema, schemaToJson, phyNode->targetSchema.pSchema, sizeof(SSlotSchema), phyNode->targetSchema.numOfCols);
res = addObject(jNode, jkPnodeSchema, dataBlockSchemaToJson, &phyNode->targetSchema);
}
if (res) {
res = addArray(jNode, jkPnodeChildren, phyNodeToJson, phyNode->pChildren);
@ -732,7 +756,7 @@ static bool phyNodeFromJson(const cJSON* json, void* obj) {
res = fromArray(json, jkPnodeConditions, exprInfoFromJson, &node->pConditions, sizeof(SExprInfo));
}
if (res) {
res = fromRawArray(json, jkPnodeSchema, schemaFromJson, (void**)&node->targetSchema.pSchema, sizeof(SSlotSchema), &node->targetSchema.numOfCols);
res = fromObject(json, jkPnodeSchema, dataBlockSchemaFromJson, &node->targetSchema, true);
}
if (res) {
res = fromArray(json, jkPnodeChildren, phyNodeFromJson, &node->pChildren, sizeof(SSlotSchema));
@ -790,6 +814,7 @@ static bool specificDataSinkFromJson(const cJSON* json, void* obj) {
}
static const char* jkDataSinkName = "Name";
static const char* jkDataSinkSchema = "Schema";
static bool dataSinkToJson(const void* obj, cJSON* json) {
const SDataSink* dsink = (const SDataSink*)obj;
@ -797,6 +822,9 @@ static bool dataSinkToJson(const void* obj, cJSON* json) {
if (res) {
res = addObject(json, dsink->info.name, specificDataSinkToJson, dsink);
}
if (res) {
res = addObject(json, jkDataSinkSchema, dataBlockSchemaToJson, &dsink->schema);
}
return res;
}
@ -804,7 +832,11 @@ static bool dataSinkFromJson(const cJSON* json, void* obj) {
SDataSink* dsink = (SDataSink*)obj;
dsink->info.name = getString(json, jkDataSinkName);
dsink->info.type = dsinkNameToDsinkType(dsink->info.name);
return fromObject(json, dsink->info.name, specificDataSinkFromJson, dsink, true);
bool res = fromObject(json, jkDataSinkSchema, dataBlockSchemaFromJson, &dsink->schema, true);
if (res) {
res = fromObject(json, dsink->info.name, specificDataSinkFromJson, dsink, true);
}
return res;
}
static const char* jkIdQueryId = "QueryId";

View File

@ -29,7 +29,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input;
int32_t estimateSize = sizeof(STableInfoMsg);
int32_t estimateSize = sizeof(STableInfoReq);
if (NULL == *msg || msgSize < estimateSize) {
tfree(*msg);
*msg = rpcMallocCont(estimateSize);
@ -38,7 +38,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
}
}
STableInfoMsg *bMsg = (STableInfoMsg *)*msg;
STableInfoReq *bMsg = (STableInfoReq *)*msg;
bMsg->header.vgId = htonl(bInput->vgId);
@ -59,7 +59,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
SBuildUseDBInput* bInput = (SBuildUseDBInput *)input;
int32_t estimateSize = sizeof(SUseDbMsg);
int32_t estimateSize = sizeof(SUseDbReq);
if (NULL == *msg || msgSize < estimateSize) {
tfree(*msg);
*msg = rpcMallocCont(estimateSize);
@ -68,7 +68,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
}
}
SUseDbMsg *bMsg = (SUseDbMsg *)*msg;
SUseDbReq *bMsg = (SUseDbReq *)*msg;
strncpy(bMsg->db, bInput->db, sizeof(bMsg->db));
bMsg->db[sizeof(bMsg->db) - 1] = 0;
@ -146,7 +146,7 @@ _return:
return code;
}
static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
static int32_t queryConvertTableMetaMsg(STableMetaRsp* pMetaMsg) {
pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags);
pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns);
pMetaMsg->sversion = ntohl(pMetaMsg->sversion);
@ -198,7 +198,7 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STableMeta **pMeta) {
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta) {
int32_t total = msg->numOfColumns + msg->numOfTags;
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
@ -232,7 +232,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)msg;
STableMetaRsp *pMetaMsg = (STableMetaRsp *)msg;
int32_t code = queryConvertTableMetaMsg(pMetaMsg);
if (code != TSDB_CODE_SUCCESS) {
return code;

View File

@ -1,4 +1,5 @@
#include "qworker.h"
#include <common.h>
#include "executor.h"
#include "planner.h"
#include "query.h"
@ -1028,8 +1029,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo);
//TODO call executer to init subquery
if (code) {
QW_ERR_JRET(code);
} else {
@ -1040,22 +1039,20 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
queryRsped = true;
//TODO call executer to execute subquery
code = qExecTask(pTaskInfo);
void *data = NULL;
SSDataBlock* pRes = NULL;
code = qExecTask(pTaskInfo, &pRes);
queryDone = false;
//TODO call executer to execute subquery
if (code) {
QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, data));
QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, pRes));
QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED));
}
_return:
if (queryRsped) {
code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg, code);
} else {
@ -1081,7 +1078,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
return TSDB_CODE_QRY_INVALID_INPUT;
}
SResReadyMsg *msg = pMsg->pCont;
SResReadyReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -1102,7 +1099,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
int32_t code = 0;
SSchTasksStatusMsg *msg = pMsg->pCont;
SSchTasksStatusReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -1126,7 +1123,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SResFetchMsg *msg = pMsg->pCont;
SResFetchReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
@ -1158,7 +1155,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
int32_t code = 0;
STaskCancelMsg *msg = pMsg->pCont;
STaskCancelReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task cancel msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -1183,7 +1180,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
int32_t code = 0;
STaskDropMsg *msg = pMsg->pCont;
STaskDropReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task drop msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);

View File

@ -113,12 +113,12 @@ void *readyThread(void *param) {
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResReadyMsg readyMsg = {0};
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
readyRpc.contLen = sizeof(SResReadyReq);
while (!testStop) {
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
@ -137,12 +137,12 @@ void *fetchThread(void *param) {
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResFetchMsg fetchMsg = {0};
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
fetchRpc.contLen = sizeof(SResFetchReq);
while (!testStop) {
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
@ -161,12 +161,12 @@ void *dropThread(void *param) {
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
STaskDropMsg dropMsg = {0};
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
dropRpc.contLen = sizeof(STaskDropReq);
while (!testStop) {
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
@ -185,10 +185,10 @@ void *statusThread(void *param) {
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSchTasksStatusMsg statusMsg = {0};
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
while (!testStop) {
@ -228,31 +228,31 @@ TEST(seqTest, normalCase) {
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyMsg readyMsg = {0};
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
readyRpc.contLen = sizeof(SResReadyReq);
SResFetchMsg fetchMsg = {0};
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
fetchRpc.contLen = sizeof(SResFetchReq);
STaskDropMsg dropMsg = {0};
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusMsg statusMsg = {0};
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
@ -312,17 +312,17 @@ TEST(seqTest, cancelFirst) {
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
STaskDropMsg dropMsg = {0};
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusMsg statusMsg = {0};
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
@ -370,31 +370,31 @@ TEST(seqTest, randCase) {
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyMsg readyMsg = {0};
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
readyRpc.contLen = sizeof(SResReadyReq);
SResFetchMsg fetchMsg = {0};
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
fetchRpc.contLen = sizeof(SResFetchReq);
STaskDropMsg dropMsg = {0};
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusMsg statusMsg = {0};
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();

View File

@ -593,7 +593,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
job->resNumOfRows += rsp->affectedRows;
code = schProcessOnTaskSuccess(job, task);
@ -829,14 +829,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
break;
}
case TDMT_VND_RES_READY: {
msgSize = sizeof(SResReadyMsg);
msgSize = sizeof(SResReadyReq);
msg = calloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResReadyMsg *pMsg = msg;
SResReadyReq *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
@ -846,14 +846,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
break;
}
case TDMT_VND_FETCH: {
msgSize = sizeof(SResFetchMsg);
msgSize = sizeof(SResFetchReq);
msg = calloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResFetchMsg *pMsg = msg;
SResFetchReq *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
@ -863,14 +863,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
break;
}
case TDMT_VND_DROP_TASK:{
msgSize = sizeof(STaskDropMsg);
msgSize = sizeof(STaskDropReq);
msg = calloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskDropMsg *pMsg = msg;
STaskDropReq *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);

View File

@ -186,7 +186,7 @@ void *schtSendRsp(void *param) {
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SShellSubmitRspMsg rsp = {0};
SShellSubmitRsp rsp = {0};
rsp.affectedRows = 10;
schProcessRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);

View File

@ -22,10 +22,14 @@ extern "C" {
#include "os.h"
#include "tkvDef.h"
typedef struct SDiskMgr SDiskMgr;
int tdmReadPage(SDiskMgr *pDiskMgr, int32_t pgid, void *pData);
int tdmWritePage(SDiskMgr *pDiskMgr, int32_t pgid, const void *pData);
int tdmOpen(SDiskMgr **ppDiskMgr, const char *fname, uint16_t pgsize);
int tdmClose(SDiskMgr *pDiskMgr);
int tdmReadPage(SDiskMgr *pDiskMgr, pgid_t pgid, void *pData);
int tdmWritePage(SDiskMgr *pDiskMgr, pgid_t pgid, const void *pData);
int32_t tdmAllocPage(SDiskMgr *pDiskMgr);
#ifdef __cplusplus

View File

@ -0,0 +1,41 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TKV_PAGE_H_
#define _TD_TKV_PAGE_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
uint16_t dbver;
uint16_t pgsize;
uint32_t cksm;
} SPgHdr;
typedef struct {
SPgHdr chdr;
uint16_t used; // number of used slots
uint16_t loffset; // the offset of the starting location of the last slot used
} SSlottedPgHdr;
#ifdef __cplusplus
}
#endif
#endif /*_TD_TKV_PAGE_H_*/

View File

@ -13,15 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TKV_MACRO_H_
#define _TD_TKV_MACRO_H_
#ifndef _TD_TKV_DEF_H_
#define _TD_TKV_DEF_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
// pgid_t
typedef int32_t pgid_t;
#define TKV_IVLD_PGID ((pgid_t)-1)
#ifdef __cplusplus
}
#endif
#endif /*_TD_TKV_MACRO_H_*/
#endif /*_TD_TKV_DEF_H_*/

View File

@ -24,13 +24,23 @@ struct SDiskMgr {
#define PAGE_OFFSET(PGID, PGSIZE) ((PGID) * (PGSIZE))
int tdmReadPage(SDiskMgr *pDiskMgr, int32_t pgid, void *pData) {
int tdmOpen(SDiskMgr **ppDiskMgr, const char *fname, uint16_t pgsize) {
// TODO
return 0;
}
int tdmClose(SDiskMgr *pDiskMgr) {
// TODO
return 0;
}
int tdmReadPage(SDiskMgr *pDiskMgr, pgid_t pgid, void *pData) {
taosLSeekFile(pDiskMgr->fd, PAGE_OFFSET(pgid, pDiskMgr->pgsize), SEEK_SET);
taosReadFile(pDiskMgr->fd, pData, pDiskMgr->pgsize);
return 0;
}
int tdmWritePage(SDiskMgr *pDiskMgr, int32_t pgid, const void *pData) {
int tdmWritePage(SDiskMgr *pDiskMgr, pgid_t pgid, const void *pData) {
taosLSeekFile(pDiskMgr->fd, PAGE_OFFSET(pgid, pDiskMgr->pgsize), SEEK_SET);
taosWriteFile(pDiskMgr->fd, pData, pDiskMgr->pgsize);
return 0;

View File

@ -0,0 +1,10 @@
#include "gtest/gtest.h"
#include "iostream"
#include "tDiskMgr.h"
TEST(tDiskMgrTest, simple_test) {
// TODO
std::cout << "This is in tDiskMgrTest::simple_test" << std::endl;
}

View File

@ -85,4 +85,23 @@ if $data02 != 2 then
return -1
endi
return
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
system sh/exec.sh -n dnode1 -s start
sql show databases
if $rows != 1 then
return -1
endi
sql_error use d1
sql use d4
sql show vgroups
if $rows != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -338,7 +338,7 @@ void *taosNetInitRpc(char *secretEncrypt, char spi) {
return pRpcConn;
}
static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupMsg *pStep) {
static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupReq *pStep) {
SRpcEpSet epSet;
SRpcMsg reqMsg;
SRpcMsg rspMsg;
@ -374,7 +374,7 @@ static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t p
}
int32_t code = 0;
if (pStep != NULL && rspMsg.pCont != NULL && rspMsg.contLen > 0 && rspMsg.contLen <= sizeof(SStartupMsg)) {
if (pStep != NULL && rspMsg.pCont != NULL && rspMsg.contLen > 0 && rspMsg.contLen <= sizeof(SStartupReq)) {
memcpy(pStep, rspMsg.pCont, rspMsg.contLen);
code = 1;
}
@ -384,8 +384,8 @@ static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t p
return code;
}
static int32_t taosNetParseStartup(SStartupMsg *pCont) {
SStartupMsg *pStep = pCont;
static int32_t taosNetParseStartup(SStartupReq *pCont) {
SStartupReq *pStep = pCont;
uInfo("step:%s desc:%s", pStep->name, pStep->desc);
if (pStep->finished) {
@ -398,7 +398,7 @@ static int32_t taosNetParseStartup(SStartupMsg *pCont) {
static void taosNetTestStartup(char *host, int32_t port) {
uInfo("check startup, host:%s port:%d\n", host, port);
SStartupMsg *pStep = malloc(sizeof(SStartupMsg));
SStartupReq *pStep = malloc(sizeof(SStartupReq));
while (1) {
int32_t code = taosNetCheckRpc(host, port + TSDB_PORT_DNODEDNODE, 20, 0, pStep);
if (code > 0) {