Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj
This commit is contained in:
commit
506dd5905a
|
@ -133,47 +133,18 @@ typedef enum _mgmt_table {
|
|||
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
|
||||
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
|
||||
|
||||
typedef struct SKlv {
|
||||
typedef struct SKv {
|
||||
int32_t keyLen;
|
||||
int32_t valueLen;
|
||||
void* key;
|
||||
void* value;
|
||||
} SKlv;
|
||||
} SKv;
|
||||
|
||||
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
|
||||
tlen += taosEncodeFixedI32(buf, pKlv->valueLen);
|
||||
tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen);
|
||||
tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
|
||||
buf = taosDecodeFixedI32(buf, &pKlv->keyLen);
|
||||
buf = taosDecodeFixedI32(buf, &pKlv->valueLen);
|
||||
buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen);
|
||||
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
|
||||
return buf;
|
||||
}
|
||||
typedef struct SClientHbKey {
|
||||
int32_t connId;
|
||||
int32_t hbType;
|
||||
} SClientHbKey;
|
||||
|
||||
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pKey->connId);
|
||||
tlen += taosEncodeFixedI32(buf, pKey->hbType);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
|
||||
buf = taosDecodeFixedI32(buf, &pKey->connId);
|
||||
buf = taosDecodeFixedI32(buf, &pKey->hbType);
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SClientHbReq {
|
||||
SClientHbKey connKey;
|
||||
SHashObj* info; // hash<Slv.key, Sklv>
|
||||
|
@ -184,9 +155,6 @@ typedef struct SClientHbBatchReq {
|
|||
SArray* reqs; // SArray<SClientHbReq>
|
||||
} SClientHbBatchReq;
|
||||
|
||||
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
|
||||
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq);
|
||||
|
||||
typedef struct SClientHbRsp {
|
||||
SClientHbKey connKey;
|
||||
int32_t status;
|
||||
|
@ -200,6 +168,58 @@ typedef struct SClientHbBatchRsp {
|
|||
SArray* rsps; // SArray<SClientHbRsp>
|
||||
} SClientHbBatchRsp;
|
||||
|
||||
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
|
||||
return taosIntHash_64(key, keyLen);
|
||||
}
|
||||
|
||||
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
|
||||
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq);
|
||||
|
||||
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
|
||||
SClientHbReq* req = (SClientHbReq*)pReq;
|
||||
taosHashCleanup(req->info);
|
||||
free(pReq);
|
||||
}
|
||||
|
||||
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
|
||||
void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
|
||||
|
||||
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
|
||||
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
|
||||
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
|
||||
free(pReq);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pKv->keyLen);
|
||||
tlen += taosEncodeFixedI32(buf, pKv->valueLen);
|
||||
tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen);
|
||||
tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) {
|
||||
buf = taosDecodeFixedI32(buf, &pKv->keyLen);
|
||||
buf = taosDecodeFixedI32(buf, &pKv->valueLen);
|
||||
buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen);
|
||||
buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen);
|
||||
return buf;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pKey->connId);
|
||||
tlen += taosEncodeFixedI32(buf, pKey->hbType);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
|
||||
buf = taosDecodeFixedI32(buf, &pKey->connId);
|
||||
buf = taosDecodeFixedI32(buf, &pKey->hbType);
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct SBuildTableMetaInput {
|
||||
int32_t vgId;
|
||||
char* dbName;
|
||||
|
|
|
@ -23,6 +23,7 @@ extern "C" {
|
|||
#include "query.h"
|
||||
#include "tmsg.h"
|
||||
#include "tarray.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#define QUERY_TYPE_MERGE 1
|
||||
#define QUERY_TYPE_PARTIAL 2
|
||||
|
|
|
@ -103,55 +103,6 @@ typedef struct STableMetaOutput {
|
|||
STableMeta *tbMeta;
|
||||
} STableMetaOutput;
|
||||
|
||||
typedef struct SDataBuf {
|
||||
void *pData;
|
||||
uint32_t len;
|
||||
} SDataBuf;
|
||||
|
||||
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
typedef int32_t (*__async_exec_fn_t)(void* param);
|
||||
|
||||
typedef struct SMsgSendInfo {
|
||||
__async_send_cb_fn_t fp; //async callback function
|
||||
void *param;
|
||||
uint64_t requestId;
|
||||
uint64_t requestObjRefId;
|
||||
int32_t msgType;
|
||||
SDataBuf msgInfo;
|
||||
} SMsgSendInfo;
|
||||
|
||||
typedef struct SQueryNodeAddr{
|
||||
int32_t nodeId; //vgId or qnodeId
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
|
||||
} SQueryNodeAddr;
|
||||
|
||||
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
||||
|
||||
int32_t initTaskQueue();
|
||||
int32_t cleanupTaskQueue();
|
||||
|
||||
/**
|
||||
*
|
||||
* @param execFn The asynchronously execution function
|
||||
* @param execParam The parameters of the execFn
|
||||
* @param code The response code during execution the execFn
|
||||
* @return
|
||||
*/
|
||||
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
|
||||
|
||||
/**
|
||||
* Asynchronously send message to server, after the response received, the callback will be incured.
|
||||
*
|
||||
* @param pTransporter
|
||||
* @param epSet
|
||||
* @param pTransporterId
|
||||
* @param pInfo
|
||||
* @return
|
||||
*/
|
||||
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
|
||||
|
||||
const SSchema* tGetTbnameColumnSchema();
|
||||
void initQueryModuleMsgHandle();
|
||||
|
||||
|
|
|
@ -84,6 +84,55 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp)
|
|||
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
||||
void rpcCancelRequest(int64_t rid);
|
||||
|
||||
typedef struct SDataBuf {
|
||||
void *pData;
|
||||
uint32_t len;
|
||||
} SDataBuf;
|
||||
|
||||
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
typedef int32_t (*__async_exec_fn_t)(void* param);
|
||||
|
||||
typedef struct SMsgSendInfo {
|
||||
__async_send_cb_fn_t fp; //async callback function
|
||||
void *param;
|
||||
uint64_t requestId;
|
||||
uint64_t requestObjRefId;
|
||||
int32_t msgType;
|
||||
SDataBuf msgInfo;
|
||||
} SMsgSendInfo;
|
||||
|
||||
typedef struct SQueryNodeAddr{
|
||||
int32_t nodeId; //vgId or qnodeId
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
|
||||
} SQueryNodeAddr;
|
||||
|
||||
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
||||
|
||||
int32_t initTaskQueue();
|
||||
int32_t cleanupTaskQueue();
|
||||
|
||||
/**
|
||||
*
|
||||
* @param execFn The asynchronously execution function
|
||||
* @param execParam The parameters of the execFn
|
||||
* @param code The response code during execution the execFn
|
||||
* @return
|
||||
*/
|
||||
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
|
||||
|
||||
/**
|
||||
* Asynchronously send message to server, after the response received, the callback will be incured.
|
||||
*
|
||||
* @param pTransporter
|
||||
* @param epSet
|
||||
* @param pTransporterId
|
||||
* @param pInfo
|
||||
* @return
|
||||
*/
|
||||
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -214,6 +214,24 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
|
|||
int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
|
||||
|
||||
|
||||
/**
|
||||
* Get the corresponding key information for a given data in hash table, using memcpy
|
||||
* @param data
|
||||
* @param dst
|
||||
* @return
|
||||
*/
|
||||
static FORCE_INLINE int32_t taosHashCopyKey(void *data, void* dst) {
|
||||
if (NULL == data || NULL == dst) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SHashNode * node = GET_HASH_PNODE(data);
|
||||
void* key = GET_HASH_NODE_KEY(node);
|
||||
memcpy(dst, key, node->keyLen);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the corresponding data length for a given data in hash table
|
||||
* @param data
|
||||
|
|
|
@ -18,36 +18,61 @@
|
|||
#include "thash.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
#define HEARTBEAT_INTERVAL 1500 // ms
|
||||
|
||||
typedef enum {
|
||||
mq = 0,
|
||||
// type can be added here
|
||||
HEARTBEAT_TYPE_MQ = 0,
|
||||
// types can be added here
|
||||
//
|
||||
HEARTBEAT_TYPE_MAX
|
||||
} EHbType;
|
||||
|
||||
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
|
||||
typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param);
|
||||
|
||||
typedef struct SAppHbMgr {
|
||||
// statistics
|
||||
int32_t reportCnt;
|
||||
int32_t connKeyCnt;
|
||||
int64_t reportBytes; // not implemented
|
||||
int64_t startTime;
|
||||
// ctl
|
||||
SRWLatch lock; // lock is used in serialization
|
||||
// connection
|
||||
void* transporter;
|
||||
SEpSet epSet;
|
||||
// info
|
||||
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
|
||||
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
|
||||
} SAppHbMgr;
|
||||
|
||||
typedef struct SClientHbMgr {
|
||||
int8_t inited;
|
||||
int32_t reportInterval; // unit ms
|
||||
int32_t stats;
|
||||
SRWLatch lock;
|
||||
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
|
||||
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
|
||||
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
|
||||
// input queue
|
||||
int8_t inited;
|
||||
// ctl
|
||||
int8_t threadStop;
|
||||
pthread_t thread;
|
||||
pthread_mutex_t lock; // used when app init and cleanup
|
||||
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
|
||||
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
|
||||
} SClientHbMgr;
|
||||
|
||||
static SClientHbMgr clientHbMgr = {0};
|
||||
// TODO: embed param into function
|
||||
// return type: SArray<Skv>
|
||||
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
|
||||
|
||||
// global, called by mgmt
|
||||
int hbMgrInit();
|
||||
void hbMgrCleanUp();
|
||||
int hbHandleRsp(void* hbMsg);
|
||||
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
|
||||
|
||||
// cluster level
|
||||
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet);
|
||||
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
|
||||
|
||||
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func);
|
||||
// conn level
|
||||
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
|
||||
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
|
||||
|
||||
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
|
||||
|
||||
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
|
||||
|
||||
// mq
|
||||
void hbMgrInitMqHbRspHandle();
|
||||
|
|
|
@ -14,64 +14,214 @@
|
|||
*/
|
||||
|
||||
#include "clientHb.h"
|
||||
#include "trpc.h"
|
||||
|
||||
static int32_t mqHbRspHandle(SClientHbRsp* pReq) {
|
||||
static SClientHbMgr clientHbMgr = {0};
|
||||
|
||||
static int32_t hbCreateThread();
|
||||
static void hbStopThread();
|
||||
|
||||
static int32_t hbMqHbRspHandle(SClientHbRsp* pReq) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
|
||||
void hbMgrInitMqHbRspHandle() {
|
||||
clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void hbMgrInitHandle() {
|
||||
// init all handle
|
||||
hbMgrInitMqHbRspHandle();
|
||||
}
|
||||
|
||||
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
||||
SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
|
||||
pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
|
||||
|
||||
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
|
||||
while (pIter != NULL) {
|
||||
taosArrayPush(pReq->reqs, pIter);
|
||||
SClientHbReq* pOneReq = pIter;
|
||||
taosHashClear(pOneReq->info);
|
||||
|
||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
|
||||
while (pIter != NULL) {
|
||||
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
|
||||
SClientHbKey connKey;
|
||||
taosHashCopyKey(pIter, &connKey);
|
||||
getConnInfoFp(connKey, NULL);
|
||||
|
||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
}
|
||||
|
||||
return pReq;
|
||||
}
|
||||
|
||||
static void* hbThreadFunc(void* param) {
|
||||
setThreadName("hb");
|
||||
while (1) {
|
||||
int8_t threadStop = atomic_load_8(&clientHbMgr.threadStop);
|
||||
if(threadStop) {
|
||||
break;
|
||||
}
|
||||
|
||||
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
|
||||
for(int i = 0; i < sz; i++) {
|
||||
SAppHbMgr* pAppHbMgr = taosArrayGet(clientHbMgr.appHbMgrs, i);
|
||||
SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr);
|
||||
void* reqStr = NULL;
|
||||
int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq);
|
||||
SMsgSendInfo info;
|
||||
/*info.fp = hbHandleRsp;*/
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(pAppHbMgr->transporter, &pAppHbMgr->epSet, &transporterId, &info);
|
||||
tFreeClientHbBatchReq(pReq);
|
||||
|
||||
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
||||
taosMsleep(HEARTBEAT_INTERVAL);
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t hbCreateThread() {
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
pthread_attr_destroy(&thAttr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void hbMgrInitMqHbFunc() {
|
||||
clientHbMgr.handle[mq] = mqHbRspHandle;
|
||||
static void hbStopThread() {
|
||||
atomic_store_8(&clientHbMgr.threadStop, 1);
|
||||
}
|
||||
|
||||
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) {
|
||||
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
|
||||
if (pAppHbMgr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
// init stat
|
||||
pAppHbMgr->startTime = taosGetTimestampMs();
|
||||
|
||||
// init connection info
|
||||
pAppHbMgr->transporter = transporter;
|
||||
pAppHbMgr->epSet = epSet;
|
||||
|
||||
// init hash info
|
||||
pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
|
||||
// init getInfoFunc
|
||||
pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
|
||||
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
|
||||
return pAppHbMgr;
|
||||
}
|
||||
|
||||
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
|
||||
pthread_mutex_lock(&clientHbMgr.lock);
|
||||
|
||||
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SAppHbMgr* pTarget = taosArrayGet(clientHbMgr.appHbMgrs, i);
|
||||
if (pAppHbMgr == pTarget) {
|
||||
taosHashCleanup(pTarget->activeInfo);
|
||||
taosHashCleanup(pTarget->getInfoFuncs);
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&clientHbMgr.lock);
|
||||
}
|
||||
|
||||
int hbMgrInit() {
|
||||
//init once
|
||||
// init once
|
||||
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
|
||||
if (old == 1) return 0;
|
||||
|
||||
//init config
|
||||
clientHbMgr.reportInterval = 1500;
|
||||
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*));
|
||||
pthread_mutex_init(&clientHbMgr.lock, NULL);
|
||||
|
||||
//init stat
|
||||
clientHbMgr.stats = 0;
|
||||
|
||||
//init lock
|
||||
taosInitRWLatch(&clientHbMgr.lock);
|
||||
// init handle funcs
|
||||
hbMgrInitHandle();
|
||||
|
||||
//init handle funcs
|
||||
hbMgrInitMqHbFunc();
|
||||
// init backgroud thread
|
||||
hbCreateThread();
|
||||
|
||||
//init hash info
|
||||
clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
//init getInfoFunc
|
||||
clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void hbMgrCleanUp() {
|
||||
// destroy all appHbMgr
|
||||
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
|
||||
if (old == 0) return;
|
||||
|
||||
taosHashCleanup(clientHbMgr.activeInfo);
|
||||
taosHashCleanup(clientHbMgr.getInfoFuncs);
|
||||
taosArrayDestroy(clientHbMgr.appHbMgrs);
|
||||
|
||||
}
|
||||
|
||||
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) {
|
||||
|
||||
int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
|
||||
int64_t reqId = hbRsp->reqId;
|
||||
int64_t rspId = hbRsp->rspId;
|
||||
|
||||
SArray* rsps = hbRsp->rsps;
|
||||
int32_t sz = taosArrayGetSize(rsps);
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SClientHbRsp* pRsp = taosArrayGet(rsps, i);
|
||||
if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) {
|
||||
clientHbMgr.handle[pRsp->connKey.hbType](pRsp);
|
||||
} else {
|
||||
// discard rsp
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
|
||||
//lock
|
||||
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) {
|
||||
// init hash in activeinfo
|
||||
void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
if (data != NULL) {
|
||||
return 0;
|
||||
}
|
||||
SClientHbReq hbReq;
|
||||
hbReq.connKey = connKey;
|
||||
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
|
||||
// init hash
|
||||
if (func != NULL) {
|
||||
taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo));
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
|
||||
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey));
|
||||
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||
}
|
||||
|
||||
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
|
||||
// find req by connection id
|
||||
SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
ASSERT(pReq != NULL);
|
||||
|
||||
taosHashPut(pReq->info, key, keyLen, value, valueLen);
|
||||
|
||||
//find req by connection id
|
||||
SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
ASSERT(data != NULL);
|
||||
taosHashPut(data->info, key, keyLen, value, valueLen);
|
||||
|
||||
//unlock
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -305,6 +305,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|||
} else {
|
||||
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
|
||||
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
|
||||
pRequest->code = terrno;
|
||||
return pRequest;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
|
|
@ -89,17 +89,17 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
|
|||
int tlen = 0;
|
||||
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
|
||||
|
||||
void *pIter = NULL;
|
||||
void *data;
|
||||
SKlv klv;
|
||||
data = taosHashIterate(pReq->info, pIter);
|
||||
while (data != NULL) {
|
||||
taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen);
|
||||
klv.valueLen = taosHashGetDataLen(data);
|
||||
klv.value = data;
|
||||
taosEncodeSKlv(buf, &klv);
|
||||
int kvNum = taosHashGetSize(pReq->info);
|
||||
tlen += taosEncodeFixedI32(buf, kvNum);
|
||||
SKv kv;
|
||||
void* pIter = taosHashIterate(pReq->info, pIter);
|
||||
while (pIter != NULL) {
|
||||
taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen);
|
||||
kv.valueLen = taosHashGetDataLen(pIter);
|
||||
kv.value = pIter;
|
||||
tlen += taosEncodeSKv(buf, &kv);
|
||||
|
||||
data = taosHashIterate(pReq->info, pIter);
|
||||
pIter = taosHashIterate(pReq->info, pIter);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
@ -109,16 +109,27 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) {
|
|||
buf = taosDecodeSClientHbKey(buf, &pReq->connKey);
|
||||
|
||||
// TODO: error handling
|
||||
if (pReq->info == NULL) {
|
||||
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
int kvNum;
|
||||
taosDecodeFixedI32(buf, &kvNum);
|
||||
pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
for(int i = 0; i < kvNum; i++) {
|
||||
SKv kv;
|
||||
buf = taosDecodeSKv(buf, &kv);
|
||||
taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen);
|
||||
}
|
||||
SKlv klv;
|
||||
buf = taosDecodeSKlv(buf, &klv);
|
||||
taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq) {
|
||||
int tlen = 0;
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq) {
|
||||
return buf;
|
||||
}
|
||||
|
||||
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||
int tlen = 0;
|
||||
|
||||
|
|
|
@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
|
|||
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
|
||||
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
|
||||
|
||||
SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||
SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||
SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||
SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -177,7 +177,7 @@ static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) {
|
|||
}
|
||||
|
||||
static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) {
|
||||
mTrace("acct:%s, perform update action, old_row:%p new_row:%p", pOld->acct, pOld, pNew);
|
||||
mTrace("acct:%s, perform update action, old row:%p new row:%p", pOld->acct, pOld, pNew);
|
||||
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
pOld->status = pNew->status;
|
||||
|
|
|
@ -155,7 +155,7 @@ static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) {
|
|||
}
|
||||
|
||||
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew) {
|
||||
mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
mTrace("bnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -135,7 +135,7 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
|
|||
}
|
||||
|
||||
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) {
|
||||
mTrace("cluster:%" PRId64 ", perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
|||
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
|
||||
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
|
||||
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb);
|
||||
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
|
||||
static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessDropDbReq(SMnodeMsg *pReq);
|
||||
|
@ -182,12 +182,12 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||
mTrace("db:%s, perform update action, old_row:%p new_row:%p", pOldDb->name, pOldDb, pNewDb);
|
||||
pOldDb->updateTime = pNewDb->updateTime;
|
||||
pOldDb->cfgVersion = pNewDb->cfgVersion;
|
||||
pOldDb->vgVersion = pNewDb->vgVersion;
|
||||
memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg));
|
||||
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
|
||||
mTrace("db:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
pOld->cfgVersion = pNew->cfgVersion;
|
||||
pOld->vgVersion = pNew->vgVersion;
|
||||
memcpy(&pOld->cfg, &pNew->cfg, sizeof(SDbCfg));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -331,14 +331,15 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
SCreateVnodeReq *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||
if (pMsg == NULL) return -1;
|
||||
SCreateVnodeReq *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup);
|
||||
if (pReq == NULL) return -1;
|
||||
|
||||
action.pCont = pMsg;
|
||||
action.pCont = pReq;
|
||||
action.contLen = sizeof(SCreateVnodeReq);
|
||||
action.msgType = TDMT_DND_CREATE_VNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
free(pReq);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -360,14 +361,15 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||
if (pMsg == NULL) return -1;
|
||||
SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup);
|
||||
if (pReq == NULL) return -1;
|
||||
|
||||
action.pCont = pMsg;
|
||||
action.pCont = pReq;
|
||||
action.contLen = sizeof(SDropVnodeReq);
|
||||
action.msgType = TDMT_DND_DROP_VNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
|
||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
free(pReq);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -376,7 +378,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) {
|
||||
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbMsg *pCreate, SUserObj *pUser) {
|
||||
SDbObj dbObj = {0};
|
||||
memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN);
|
||||
memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
|
||||
|
@ -425,43 +427,17 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
|
|||
}
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
|
||||
goto CREATE_DB_OVER;
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_DB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
|
||||
|
||||
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
|
||||
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
goto CREATE_DB_OVER;
|
||||
}
|
||||
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
||||
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
||||
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
||||
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
||||
if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_DB_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -471,9 +447,9 @@ CREATE_DB_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateDbReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SCreateDbMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SCreateDbMsg *pCreate = pReq->rpcMsg.pCont;
|
||||
|
||||
pCreate->numOfVgroups = htonl(pCreate->numOfVgroups);
|
||||
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
|
||||
|
@ -502,13 +478,13 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
SUserObj *pOperUser = mndAcquireUser(pMnode, pMsg->user);
|
||||
SUserObj *pOperUser = mndAcquireUser(pMnode, pReq->user);
|
||||
if (pOperUser == NULL) {
|
||||
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndCreateDb(pMnode, pMsg, pCreate, pOperUser);
|
||||
int32_t code = mndCreateDb(pMnode, pReq, pCreate, pOperUser);
|
||||
mndReleaseUser(pMnode, pOperUser);
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -565,8 +541,8 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||
SSdbRaw *pRedoRaw = mndDbActionEncode(pOldDb);
|
||||
static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1;
|
||||
|
@ -574,8 +550,8 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||
SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb);
|
||||
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||
SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
|
@ -593,14 +569,14 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
SAlterVnodeReq *pMsg = (SAlterVnodeReq *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||
if (pMsg == NULL) return -1;
|
||||
SAlterVnodeReq *pReq = (SAlterVnodeReq *)mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup);
|
||||
if (pReq == NULL) return -1;
|
||||
|
||||
action.pCont = pMsg;
|
||||
action.pCont = pReq;
|
||||
action.contLen = sizeof(SAlterVnodeReq);
|
||||
action.msgType = TDMT_DND_ALTER_VNODE;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
free(pReq);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -608,7 +584,7 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||
static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
|
||||
|
@ -617,8 +593,8 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pVgroup->dbUid == pNewDb->uid) {
|
||||
if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNewDb, pVgroup) != 0) {
|
||||
if (pVgroup->dbUid == pNew->uid) {
|
||||
if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
|
@ -631,27 +607,27 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("db:%s, failed to update since %s", pOldDb->name, terrstr());
|
||||
mError("db:%s, failed to update since %s", pOld->name, terrstr());
|
||||
return terrno;
|
||||
}
|
||||
|
||||
mDebug("trans:%d, used to update db:%s", pTrans->id, pOldDb->name);
|
||||
mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name);
|
||||
|
||||
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) {
|
||||
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto UPDATE_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) {
|
||||
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto UPDATE_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) {
|
||||
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto UPDATE_DB_OVER;
|
||||
}
|
||||
|
@ -668,9 +644,9 @@ UPDATE_DB_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessAlterDbReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SAlterDbMsg *pAlter = pMsg->rpcMsg.pCont;
|
||||
static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SAlterDbMsg *pAlter = pReq->rpcMsg.pCont;
|
||||
pAlter->totalBlocks = htonl(pAlter->totalBlocks);
|
||||
pAlter->daysToKeep0 = htonl(pAlter->daysToKeep0);
|
||||
pAlter->daysToKeep1 = htonl(pAlter->daysToKeep1);
|
||||
|
@ -697,7 +673,7 @@ static int32_t mndProcessAlterDbReq(SMnodeMsg *pMsg) {
|
|||
|
||||
dbObj.cfgVersion++;
|
||||
dbObj.updateTime = taosGetTimestampMs();
|
||||
code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj);
|
||||
code = mndUpdateDb(pMnode, pReq, pDb, &dbObj);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -757,14 +733,15 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
|
||||
if (pMsg == NULL) return -1;
|
||||
SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup);
|
||||
if (pReq == NULL) return -1;
|
||||
|
||||
action.pCont = pMsg;
|
||||
action.pCont = pReq;
|
||||
action.contLen = sizeof(SCreateVnodeReq);
|
||||
action.msgType = TDMT_DND_DROP_VNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
free(pReq);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -795,35 +772,17 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
|
||||
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("db:%s, failed to drop since %s", pDb->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto DROP_DB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
|
||||
|
||||
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto DROP_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto DROP_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto DROP_DB_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
goto DROP_DB_OVER;
|
||||
}
|
||||
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
|
||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
|
||||
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -832,9 +791,9 @@ DROP_DB_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SDropDbMsg *pDrop = pMsg->rpcMsg.pCont;
|
||||
static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SDropDbMsg *pDrop = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("db:%s, start to drop", pDrop->db);
|
||||
|
||||
|
@ -850,7 +809,7 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t code = mndDropDb(pMnode, pMsg, pDb);
|
||||
int32_t code = mndDropDb(pMnode, pReq, pDb);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -861,16 +820,16 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessUseDbReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SUseDbMsg *pUse = pMsg->rpcMsg.pCont;
|
||||
SUseDbMsg *pUse = pReq->rpcMsg.pCont;
|
||||
pUse->vgVersion = htonl(pUse->vgVersion);
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
mError("db:%s, failed to process use db msg since %s", pUse->db, terrstr());
|
||||
mError("db:%s, failed to process use db req since %s", pUse->db, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -922,19 +881,19 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pMsg) {
|
|||
pRsp->vgNum = htonl(vindex);
|
||||
pRsp->hashMethod = pDb->hashMethod;
|
||||
|
||||
pMsg->pCont = pRsp;
|
||||
pMsg->contLen = contLen;
|
||||
pReq->pCont = pRsp;
|
||||
pReq->contLen = contLen;
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessSyncDbReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SSyncDbMsg *pSync = pMsg->rpcMsg.pCont;
|
||||
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSyncDbMsg *pSync = pReq->rpcMsg.pCont;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pSync->db);
|
||||
if (pDb == NULL) {
|
||||
mError("db:%s, failed to process sync db msg since %s", pSync->db, terrstr());
|
||||
mError("db:%s, failed to process sync db req since %s", pSync->db, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -942,12 +901,12 @@ static int32_t mndProcessSyncDbReq(SMnodeMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCompactDbReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont;
|
||||
static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SCompactDbMsg *pCompact = pReq->rpcMsg.pCont;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db);
|
||||
if (pDb == NULL) {
|
||||
mError("db:%s, failed to process compact db msg since %s", pCompact->db, terrstr());
|
||||
mError("db:%s, failed to process compact db req since %s", pCompact->db, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -955,8 +914,8 @@ static int32_t mndProcessCompactDbReq(SMnodeMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
int32_t cols = 0;
|
||||
|
|
|
@ -183,7 +183,7 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
|
|||
}
|
||||
|
||||
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
|
||||
mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -152,7 +152,7 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
|
|||
}
|
||||
|
||||
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) {
|
||||
mTrace("func:%s, perform update action, old_row:%p new_row:%p", pOldFunc->name, pOldFunc, pNewFunc);
|
||||
mTrace("func:%s, perform update action, old row:%p new row:%p", pOldFunc->name, pOldFunc, pNewFunc);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -208,7 +208,7 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
|
|||
}
|
||||
|
||||
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
|
||||
mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -155,7 +155,7 @@ static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
|
|||
}
|
||||
|
||||
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew) {
|
||||
mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
mTrace("qnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -155,7 +155,7 @@ static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) {
|
|||
}
|
||||
|
||||
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) {
|
||||
mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
mTrace("snode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -178,7 +178,7 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
|||
}
|
||||
|
||||
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) {
|
||||
mTrace("stb:%s, perform update action, old_row:%p new_row:%p", pOldStb->name, pOldStb, pNewStb);
|
||||
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOldStb->name, pOldStb, pNewStb);
|
||||
atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
|
||||
atomic_exchange_32(&pOldStb->version, pNewStb->version);
|
||||
|
||||
|
|
|
@ -91,14 +91,15 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
|
|||
if (sdbWriteFile(pSdb) != 0) {
|
||||
goto WAL_RESTORE_OVER;
|
||||
}
|
||||
}
|
||||
|
||||
if (walBeginSnapshot(pWal, sdbVer) < 0) {
|
||||
goto WAL_RESTORE_OVER;
|
||||
}
|
||||
if (walBeginSnapshot(pWal, sdbVer) < 0) {
|
||||
goto WAL_RESTORE_OVER;
|
||||
}
|
||||
|
||||
if (walEndSnapshot(pWal) < 0) {
|
||||
goto WAL_RESTORE_OVER;
|
||||
}
|
||||
|
||||
if (walEndSnapshot(pWal) < 0) {
|
||||
goto WAL_RESTORE_OVER;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
|
@ -181,4 +182,4 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
|||
bool mndIsMaster(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
return pMgmt->state == TAOS_SYNC_STATE_LEADER;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -192,7 +192,7 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
|
|||
}
|
||||
|
||||
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
|
||||
mTrace("user:%s, perform update action, old_row:%p new_row:%p", pOld->user, pOld, pNew);
|
||||
mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew);
|
||||
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
|
|
|
@ -165,7 +165,7 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
|
|||
}
|
||||
|
||||
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) {
|
||||
mTrace("vgId:%d, perform update action, old_row:%p new_row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup);
|
||||
mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup);
|
||||
pOldVgroup->updateTime = pNewVgroup->updateTime;
|
||||
pOldVgroup->version = pNewVgroup->version;
|
||||
pOldVgroup->hashBegin = pNewVgroup->hashBegin;
|
||||
|
@ -189,7 +189,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
|
|||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq));
|
||||
if (pCreate == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -248,7 +248,7 @@ SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
|
|||
return pCreate;
|
||||
}
|
||||
|
||||
SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq));
|
||||
if (pDrop == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -112,6 +112,9 @@ public:
|
|||
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
|
||||
// todo
|
||||
vgInfo->vgId = 1;
|
||||
vgInfo->numOfEps = 1;
|
||||
vgInfo->epAddr[0].port = 6030;
|
||||
strcpy(vgInfo->epAddr[0].fqdn, "node1");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,8 @@ target_include_directories(
|
|||
|
||||
target_link_libraries(
|
||||
planner
|
||||
PRIVATE os util catalog cjson parser transport function qcom
|
||||
PRIVATE os util catalog cjson parser function qcom
|
||||
PUBLIC transport
|
||||
)
|
||||
|
||||
if(${BUILD_TEST})
|
||||
|
|
|
@ -234,7 +234,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
|||
|
||||
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
|
||||
execNode->nodeId = vg->vgId;
|
||||
execNode->inUse = 0; // todo
|
||||
execNode->inUse = vg->inUse;
|
||||
execNode->numOfEps = vg->numOfEps;
|
||||
for (int8_t i = 0; i < vg->numOfEps; ++i) {
|
||||
execNode->epAddr[i] = vg->epAddr[i];
|
||||
|
|
|
@ -62,7 +62,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
|
|||
return func(jObj, *obj);
|
||||
}
|
||||
|
||||
static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* array) {
|
||||
static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
|
||||
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
|
||||
if (size > 0) {
|
||||
cJSON* jArray = cJSON_AddArrayToObject(json, name);
|
||||
|
@ -70,7 +70,7 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray*
|
|||
return false;
|
||||
}
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
if (!addItem(jArray, func, taosArrayGetP(array, i))) {
|
||||
if (!addItem(jArray, func, isPoint ? taosArrayGetP(array, i) : taosArrayGet(array, i))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -78,11 +78,19 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray*
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) {
|
||||
static bool addInlineArray(cJSON* json, const char* name, FToJson func, const SArray* array) {
|
||||
return addTarray(json, name, func, array, false);
|
||||
}
|
||||
|
||||
static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* array) {
|
||||
return addTarray(json, name, func, array, true);
|
||||
}
|
||||
|
||||
static bool fromTarray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize, bool isPoint) {
|
||||
const cJSON* jArray = cJSON_GetObjectItem(json, name);
|
||||
int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray));
|
||||
if (size > 0) {
|
||||
*array = taosArrayInit(size, POINTER_BYTES);
|
||||
*array = taosArrayInit(size, isPoint ? POINTER_BYTES : itemSize);
|
||||
if (NULL == *array) {
|
||||
return false;
|
||||
}
|
||||
|
@ -92,11 +100,19 @@ static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArra
|
|||
if (NULL == item || !func(cJSON_GetArrayItem(jArray, i), item)) {
|
||||
return false;
|
||||
}
|
||||
taosArrayPush(*array, &item);
|
||||
taosArrayPush(*array, isPoint ? &item : item);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool fromInlineArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) {
|
||||
return fromTarray(json, name, func, array, itemSize, false);
|
||||
}
|
||||
|
||||
static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) {
|
||||
return fromTarray(json, name, func, array, itemSize, true);
|
||||
}
|
||||
|
||||
static bool addRawArray(cJSON* json, const char* name, FToJson func, const void* array, int32_t itemSize, int32_t size) {
|
||||
if (size > 0) {
|
||||
cJSON* jArray = cJSON_AddArrayToObject(json, name);
|
||||
|
@ -556,6 +572,32 @@ static bool epAddrFromJson(const cJSON* json, void* obj) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static const char* jkNodeAddrId = "NodeId";
|
||||
static const char* jkNodeAddrInUse = "InUse";
|
||||
static const char* jkNodeAddrEpAddrs = "EpAddrs";
|
||||
|
||||
static bool nodeAddrToJson(const void* obj, cJSON* json) {
|
||||
const SQueryNodeAddr* ep = (const SQueryNodeAddr*)obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, ep->nodeId);
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse);
|
||||
}
|
||||
if (res) {
|
||||
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddrMsg));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static bool nodeAddrFromJson(const cJSON* json, void* obj) {
|
||||
SQueryNodeAddr* ep = (SQueryNodeAddr*)obj;
|
||||
ep->nodeId = getNumber(json, jkNodeAddrId);
|
||||
ep->inUse = getNumber(json, jkNodeAddrInUse);
|
||||
int32_t numOfEps = 0;
|
||||
bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddrMsg), &numOfEps);
|
||||
ep->numOfEps = numOfEps;
|
||||
return res;
|
||||
}
|
||||
|
||||
static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId";
|
||||
static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints";
|
||||
|
||||
|
@ -563,7 +605,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
|
|||
const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId);
|
||||
if (res) {
|
||||
res = addArray(json, jkExchangeNodeSrcEndPoints, epAddrToJson, exchange->pSrcEndPoints);
|
||||
res = addInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -571,7 +613,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
|
|||
static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
|
||||
SExchangePhyNode* exchange = (SExchangePhyNode*)obj;
|
||||
exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId);
|
||||
return fromArray(json, jkExchangeNodeSrcEndPoints, epAddrFromJson, &exchange->pSrcEndPoints, sizeof(SEpAddrMsg));
|
||||
return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SQueryNodeAddr));
|
||||
}
|
||||
|
||||
static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
|
||||
|
@ -803,7 +845,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
|
|||
if (res) {
|
||||
res = addObject(jSubplan, jkSubplanDataSink, dataSinkToJson, subplan->pDataSink);
|
||||
}
|
||||
|
||||
if (!res) {
|
||||
cJSON_Delete(jSubplan);
|
||||
return NULL;
|
||||
|
|
|
@ -15,5 +15,5 @@ TARGET_INCLUDE_DIRECTORIES(
|
|||
|
||||
TARGET_LINK_LIBRARIES(
|
||||
queryUtilTest
|
||||
PUBLIC os util gtest qcom common
|
||||
PUBLIC os util gtest qcom common transport
|
||||
)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include <iostream>
|
||||
#include "tmsg.h"
|
||||
#include "query.h"
|
||||
#include "trpc.h"
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||
|
@ -80,4 +81,4 @@ TEST(testCase, error_in_async_test) {
|
|||
taosAsyncExec(testPrintError, p, &code);
|
||||
usleep(1000);
|
||||
printf("Error code:%d after asynchronously exec function\n", code);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -131,6 +131,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
|
|||
|
||||
// seek section
|
||||
int walChangeFile(SWal* pWal, int64_t ver);
|
||||
int walChangeFileToLast(SWal* pWal);
|
||||
// seek section end
|
||||
|
||||
int64_t walGetSeq();
|
||||
|
|
|
@ -184,6 +184,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
|
|||
}
|
||||
taosArraySetSize(pArray, sz);
|
||||
pWal->fileInfoSet = pArray;
|
||||
pWal->writeCur = sz - 1;
|
||||
cJSON_Delete(pRoot);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -232,7 +232,8 @@ static int32_t walCreateThread() {
|
|||
|
||||
if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) {
|
||||
wError("failed to create wal thread since %s", strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
|
|
|
@ -244,6 +244,7 @@ int walRoll(SWal *pWal) {
|
|||
pWal->writeIdxTfd = idxTfd;
|
||||
pWal->writeLogTfd = logTfd;
|
||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||
ASSERT(pWal->writeCur >= 0);
|
||||
|
||||
pWal->lastRollSeq = walGetSeq();
|
||||
return 0;
|
||||
|
@ -253,6 +254,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
|||
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
||||
int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(SWalIdxEntry));
|
||||
if (size != sizeof(SWalIdxEntry)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// TODO truncate
|
||||
return -1;
|
||||
}
|
||||
|
@ -286,7 +288,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
|
|||
}
|
||||
/*if (!tfValid(pWal->writeLogTfd)) return -1;*/
|
||||
|
||||
ASSERT(pWal->writeCur >= 0);
|
||||
|
||||
pthread_mutex_lock(&pWal->mutex);
|
||||
if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) {
|
||||
walChangeFileToLast(pWal);
|
||||
}
|
||||
|
||||
pWal->writeHead.head.version = index;
|
||||
|
||||
int64_t offset = walGetCurFileOffset(pWal);
|
||||
|
@ -308,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
|
|||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||
strerror(errno));
|
||||
}
|
||||
|
||||
code = walWriteIndex(pWal, index, offset);
|
||||
if (code != 0) {
|
||||
// TODO
|
||||
|
|
|
@ -812,7 +812,9 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) {
|
|||
|
||||
SHashNode * node = GET_HASH_PNODE(data);
|
||||
*key = GET_HASH_NODE_KEY(node);
|
||||
*keyLen = node->keyLen;
|
||||
if (keyLen) {
|
||||
*keyLen = node->keyLen;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue