Merge branch '3.0' into feature/vnode

This commit is contained in:
Hongze Cheng 2022-01-07 05:43:37 +00:00
commit c9e1dd4e8b
10 changed files with 415 additions and 388 deletions

View File

@ -26,6 +26,7 @@ extern "C" {
#include "tarray.h" #include "tarray.h"
#include "tcoding.h" #include "tcoding.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "thash.h"
#include "tlist.h" #include "tlist.h"
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */ /* ------------------------ MESSAGE DEFINITIONS ------------------------ */
@ -132,6 +133,73 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
typedef struct SKlv {
int32_t keyLen;
int32_t valueLen;
void* key;
void* value;
} SKlv;
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
tlen += taosEncodeFixedI32(buf, pKlv->valueLen);
tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen);
tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen);
return tlen;
}
static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
buf = taosDecodeFixedI32(buf, &pKlv->keyLen);
buf = taosDecodeFixedI32(buf, &pKlv->valueLen);
buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen);
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
return buf;
}
typedef struct SClientHbKey {
int32_t connId;
int32_t hbType;
} SClientHbKey;
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKey->connId);
tlen += taosEncodeFixedI32(buf, pKey->hbType);
return tlen;
}
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
buf = taosDecodeFixedI32(buf, &pKey->connId);
buf = taosDecodeFixedI32(buf, &pKey->hbType);
return buf;
}
typedef struct SClientHbReq {
SClientHbKey connKey;
SHashObj* info; // hash<Slv.key, Sklv>
} SClientHbReq;
typedef struct SClientHbBatchReq {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq);
typedef struct SClientHbRsp {
SClientHbKey connKey;
int32_t status;
int32_t bodyLen;
void* body;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp;
typedef struct SBuildTableMetaInput { typedef struct SBuildTableMetaInput {
int32_t vgId; int32_t vgId;
char* dbName; char* dbName;
@ -1150,7 +1218,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe
tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->consumerGroup); tlen += taosEncodeString(buf, pReq->consumerGroup);
for(int i = 0; i < pReq->topicNum; i++) { for (int i = 0; i < pReq->topicNum; i++) {
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
} }
return tlen; return tlen;
@ -1161,7 +1229,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeString(buf, &pReq->consumerGroup); buf = taosDecodeString(buf, &pReq->consumerGroup);
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*)); pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
for(int i = 0; i < pReq->topicNum; i++) { for (int i = 0; i < pReq->topicNum; i++) {
char* name = NULL; char* name = NULL;
buf = taosDecodeString(buf, &name); buf = taosDecodeString(buf, &name);
taosArrayPush(pReq->topicNames, &name); taosArrayPush(pReq->topicNames, &name);
@ -1183,7 +1251,7 @@ typedef struct {
static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI32(buf, pRsp->topicNum); tlen += taosEncodeFixedI32(buf, pRsp->topicNum);
for(int i = 0; i < pRsp->topicNum; i++) { for (int i = 0; i < pRsp->topicNum; i++) {
tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId);
tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId);
tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet);
@ -1193,7 +1261,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribe
static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) {
buf = taosDecodeFixedI32(buf, &pRsp->topicNum); buf = taosDecodeFixedI32(buf, &pRsp->topicNum);
for(int i = 0; i < pRsp->topicNum; i++) { for (int i = 0; i < pRsp->topicNum; i++) {
buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId);
buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId);
buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet);

View File

@ -20,118 +20,21 @@
typedef enum { typedef enum {
mq = 0, mq = 0,
// type can be added here
//
HEARTBEAT_TYPE_MAX HEARTBEAT_TYPE_MAX
} EHbType; } EHbType;
typedef struct SKlv { typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
int32_t keyLen; typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param);
int32_t valueLen;
void* key;
void* value;
} SKlv;
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
tlen += taosEncodeFixedI32(buf, pKlv->valueLen);
tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen);
tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen);
return tlen;
}
static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
buf = taosDecodeFixedI32(buf, &pKlv->keyLen);
buf = taosDecodeFixedI32(buf, &pKlv->valueLen);
buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen);
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
return buf;
}
typedef struct SClientHbKey {
int32_t connId;
int32_t hbType;
} SClientHbKey;
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKey->connId);
tlen += taosEncodeFixedI32(buf, pKey->hbType);
return tlen;
}
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
buf = taosDecodeFixedI32(buf, &pKey->connId);
buf = taosDecodeFixedI32(buf, &pKey->hbType);
return buf;
}
typedef struct SClientHbReq {
SClientHbKey hbKey;
SHashObj* info; // hash<Sklv>
} SClientHbReq;
static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) {
int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey);
void* pIter = NULL;
void* data;
SKlv klv;
data = taosHashIterate(pReq->info, pIter);
while (data != NULL) {
taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen);
klv.valueLen = taosHashGetDataLen(data);
klv.value = data;
taosEncodeSKlv(buf, &klv);
data = taosHashIterate(pReq->info, pIter);
}
return tlen;
}
static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) {
ASSERT(pReq->info != NULL);
buf = taosDecodeSClientHbKey(buf, &pReq->hbKey);
//TODO: error handling
if(pReq->info == NULL) {
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
SKlv klv;
buf = taosDecodeSKlv(buf, &klv);
taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen);
return buf;
}
typedef struct SClientHbBatchReq {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
typedef struct SClientHbHandleResult {
} SClientHbHandleResult;
typedef struct SClientHbRsp {
int32_t connId;
int32_t hbType;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp;
typedef int32_t (*FHbRspHandle)(SClientHbReq* pReq);
typedef int32_t (*FGetConnInfo)(int32_t conn, void* self);
typedef struct SClientHbMgr { typedef struct SClientHbMgr {
int8_t inited; int8_t inited;
int32_t reportInterval; // unit ms int32_t reportInterval; // unit ms
int32_t stats; int32_t stats;
SRWLatch lock; SRWLatch lock;
SHashObj* info; //hash<SClientHbKey, SClientHbReq> SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
// input queue // input queue
} SClientHbMgr; } SClientHbMgr;
@ -140,9 +43,11 @@ static SClientHbMgr clientHbMgr = {0};
int hbMgrInit(); int hbMgrInit();
void hbMgrCleanUp(); void hbMgrCleanUp();
int hbHandleRsp(void* hbMsg);
int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle);
int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle); int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func);
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen);

View File

@ -15,45 +15,62 @@
#include "clientHb.h" #include "clientHb.h"
static int32_t mqHbRspHandle(SClientHbReq* pReq) { static int32_t mqHbRspHandle(SClientHbRsp* pReq) {
return 0; return 0;
} }
uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
return 0;
}
static void hbMgrInitMqHbFunc() {
clientHbMgr.handle[mq] = mqHbRspHandle;
}
int hbMgrInit() { int hbMgrInit() {
//init once //init once
// int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
//init lock if (old == 1) return 0;
//
//init handle funcs
clientHbMgr.handle[mq] = mqHbRspHandle;
//init stat
clientHbMgr.stats = 0;
//init config //init config
clientHbMgr.reportInterval = 1500; clientHbMgr.reportInterval = 1500;
//init stat
clientHbMgr.stats = 0;
//init lock
taosInitRWLatch(&clientHbMgr.lock);
//init handle funcs
hbMgrInitMqHbFunc();
//init hash info //init hash info
// clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
//init getInfoFunc
clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
return 0; return 0;
} }
void hbMgrCleanUp() { void hbMgrCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
if (old == 0) return;
taosHashCleanup(clientHbMgr.activeInfo);
taosHashCleanup(clientHbMgr.getInfoFuncs);
} }
int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle) { int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) {
return 0; return 0;
} }
int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle) { int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
return 0;
}
int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen) {
//lock //lock
//find req by connection id //find req by connection id
SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(data != NULL);
taosHashPut(data->info, key, keyLen, value, valueLen);
//unlock //unlock
return 0; return 0;

View File

@ -140,7 +140,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
(*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen; (*pRequest)->sqlLen = sqlLen;
tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -107,7 +107,7 @@ TEST(testCase, show_user_Test) {
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0}; char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) { while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields); int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str); printf("%s\n", str);
} }
@ -140,7 +140,7 @@ TEST(testCase, show_db_Test) {
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0}; char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) { while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields); int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str); printf("%s\n", str);
} }
@ -228,29 +228,29 @@ TEST(testCase, use_db_test) {
taos_close(pConn); taos_close(pConn);
} }
//TEST(testCase, drop_db_test) { // TEST(testCase, drop_db_test) {
//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL); // assert(pConn != NULL);
//// //
//// showDB(pConn); // showDB(pConn);
//// //
//// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); // TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
//// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
//// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); // printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
//// } // }
//// taos_free_result(pRes); // taos_free_result(pRes);
//// //
//// showDB(pConn); // showDB(pConn);
//// //
//// pRes = taos_query(pConn, "create database abc1"); // pRes = taos_query(pConn, "create database abc1");
//// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
//// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); // printf("create to drop db, reason:%s\n", taos_errstr(pRes));
//// } // }
//// taos_free_result(pRes); // taos_free_result(pRes);
//// taos_close(pConn); // taos_close(pConn);
//} //}
TEST(testCase, create_stable_Test) { TEST(testCase, create_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
@ -281,188 +281,100 @@ TEST(testCase, use_db_test) {
taos_close(pConn); taos_close(pConn);
} }
//TEST(testCase, create_table_Test) { TEST(testCase, create_table_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)");
// taos_free_result(pRes);
//
// taos_close(pConn);
//}
//TEST(testCase, create_ctable_Test) { TAOS_RES* pRes = taos_query(pConn, "use abc1");
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); taos_free_result(pRes);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, show_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "show stables");
// if (taos_errno(pRes) != 0) {
// printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//TEST(testCase, show_vgroup_Test) { pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)");
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); taos_free_result(pRes);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "show vgroups");
// if (taos_errno(pRes) != 0) {
// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
//
// taos_close(pConn);
//}
//TEST(testCase, drop_stable_Test) { taos_close(pConn);
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); }
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in creating db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in using db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop stable st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to drop stable, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//TEST(testCase, create_topic_Test) { TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("error in create stable, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
//
// char* sql = "select * from st1";
// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_close(pConn);
//}
//TEST(testCase, show_table_Test) { TAOS_RES* pRes = taos_query(pConn, "use abc1");
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (taos_errno(pRes) != 0) {
// assert(pConn != NULL); printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// }
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
// pRes = taos_query(pConn, "show tables"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); }
// taos_free_result(pRes);
// ASSERT_TRUE(false); taos_free_result(pRes);
// } taos_close(pConn);
// }
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TEST(testCase, show_stable_Test) {
// int32_t numOfFields = taos_num_fields(pRes); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) { TAOS_RES* pRes = taos_query(pConn, "use abc1");
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); if (taos_errno(pRes) != 0) {
// printf("%s\n", str); printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes);
// taos_free_result(pRes);
// taos_close(pConn); pRes = taos_query(pConn, "show stables");
//} if (taos_errno(pRes) != 0) {
printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, show_vgroup_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("failed to use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "show vgroups");
if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_multiple_tables) { TEST(testCase, create_multiple_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -498,32 +410,118 @@ TEST(testCase, create_multiple_tables) {
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0}; char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) { while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields); int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str); printf("%s\n", str);
} }
taos_free_result(pRes); taos_free_result(pRes);
for(int32_t i = 0; i < 200000; ++i) { for (int32_t i = 0; i < 20; ++i) {
char sql[512] = {0}; char sql[512] = {0};
snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); snprintf(sql, tListLen(sql),
"create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i,
(i + 1) * 30, (i + 2) * 40);
TAOS_RES* pres = taos_query(pConn, sql); TAOS_RES* pres = taos_query(pConn, sql);
if (taos_errno(pres) != 0) { if (taos_errno(pres) != 0) {
printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
} }
printf("%d\n", i);
taos_free_result(pres); taos_free_result(pres);
} }
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, generated_request_id_test) { TEST(testCase, show_table_Test) {
SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
for(int32_t i = 0; i < 50000; ++i) { TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "show tables");
if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, drop_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
if (taos_errno(pRes) != 0) {
printf("error in creating db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in using db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop stable st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop stable, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
// TEST(testCase, create_topic_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("error in create stable, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
//
// char* sql = "select * from st1";
// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_close(pConn);
//}
TEST(testCase, generated_request_id_test) {
SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for (int32_t i = 0; i < 50000; ++i) {
uint64_t v = generateRequestId(); uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v)); void* result = taosHashGet(phash, &v, sizeof(v));
if (result != nullptr) { if (result != nullptr) {
@ -536,7 +534,7 @@ TEST(testCase, generated_request_id_test) {
taosHashCleanup(phash); taosHashCleanup(phash);
} }
//TEST(testCase, projection_query_tables) { // TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_EQ(pConn, nullptr); // ASSERT_EQ(pConn, nullptr);
// //
@ -563,4 +561,3 @@ TEST(testCase, generated_request_id_test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}

View File

@ -910,7 +910,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "tsdbDebugFlag"; cfg.option = "tsdbDebugFlag";
cfg.ptr = &tsdbDebugFlag; cfg.ptr = &tsdbDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG;
cfg.minValue = 0; cfg.minValue = 0;
cfg.maxValue = 255; cfg.maxValue = 255;
cfg.ptrLength = 0; cfg.ptrLength = 0;
@ -927,6 +927,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg); taosAddConfigOption(cfg);
cfg.option = "ctgDebugFlag";
cfg.ptr = &ctgDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 255;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "enableRecordSql"; cfg.option = "enableRecordSql";
cfg.ptr = &tsTscEnableRecordSql; cfg.ptr = &tsTscEnableRecordSql;
cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.valType = TAOS_CFG_VTYPE_INT8;

View File

@ -83,6 +83,38 @@ SMemRow tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
} }
return row; return row;
} }
int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
void *pIter = NULL;
void *data;
SKlv klv;
data = taosHashIterate(pReq->info, pIter);
while (data != NULL) {
taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen);
klv.valueLen = taosHashGetDataLen(data);
klv.value = data;
taosEncodeSKlv(buf, &klv);
data = taosHashIterate(pReq->info, pIter);
}
return tlen;
}
void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) {
ASSERT(pReq->info != NULL);
buf = taosDecodeSClientHbKey(buf, &pReq->connKey);
// TODO: error handling
if (pReq->info == NULL) {
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
SKlv klv;
buf = taosDecodeSKlv(buf, &klv);
taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen);
return buf;
} }
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {

View File

@ -310,18 +310,8 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
vgInfo = NULL; vgInfo = NULL;
} }
ctgInfo("numOfVgroup:%d", taosHashGetSize(dbInfo->vgInfo));
if (NULL == vgInfo) { if (NULL == vgInfo) {
ctgError("no hash range found for hash value [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo)); ctgError("no hash range found for hash value [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo));
void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL);
while (pIter1) {
vgInfo = pIter1;
ctgError("valid range:[%u, %u], vgId:%d", vgInfo->hashBegin, vgInfo->hashEnd, vgInfo->vgId);
pIter1 = taosHashIterate(dbInfo->vgInfo, pIter1);
}
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }

View File

@ -44,15 +44,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
} }
if (!isDqlSqlStatement(&info)) { if (!isDqlSqlStatement(&info)) {
// bool toVnode = false;
if (info.type == TSDB_SQL_CREATE_TABLE) { if (info.type == TSDB_SQL_CREATE_TABLE) {
// SCreateTableSql* pCreateSql = info.pCreateTableInfo;
// if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
// toVnode = true;
// }
// }
// if (toVnode) {
SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pModifStmtInfo == NULL) { if (pModifStmtInfo == NULL) {
return terrno; return terrno;

View File

@ -19,8 +19,9 @@
#include "taos.h" #include "taos.h"
#include "tdef.h" #include "tdef.h"
// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT
#define MAX_WARNING_REF_COUNT 10000
#define EXT_SIZE 1024 #define EXT_SIZE 1024
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) #define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
#define DO_FREE_HASH_NODE(_n) \ #define DO_FREE_HASH_NODE(_n) \
@ -907,8 +908,24 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
if (pNode) { if (pNode) {
SHashEntry *pe = pHashObj->hashList[slot]; SHashEntry *pe = pHashObj->hashList[slot];
pNode->count++;
uint16_t prevRef = atomic_load_16(&pNode->count);
uint16_t afterRef = atomic_add_fetch_16(&pNode->count, 1);
// the reference count value is overflow, which will cause the delete node operation immediately.
if (prevRef > afterRef) {
uError("hash entry ref count overflow, prev ref:%d, current ref:%d", prevRef, afterRef);
// restore the value
atomic_sub_fetch_16(&pNode->count, 1);
data = NULL;
} else {
data = GET_HASH_NODE_DATA(pNode); data = GET_HASH_NODE_DATA(pNode);
}
if (afterRef >= MAX_WARNING_REF_COUNT) {
uWarn("hash entry ref count is abnormally high: %d", afterRef);
}
if (pHashObj->type == HASH_ENTRY_LOCK) { if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch); taosWUnLockLatch(&pe->latch);
} }
@ -916,7 +933,6 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
__rd_unlock((void*) &pHashObj->lock, pHashObj->type); __rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return data; return data;
} }
void taosHashCancelIterate(SHashObj *pHashObj, void *p) { void taosHashCancelIterate(SHashObj *pHashObj, void *p) {