Merge remote-tracking branch 'origin/3.0' into feature/qnode

This commit is contained in:
dapan1121 2022-01-27 10:58:02 +08:00
commit 5daa79afef
24 changed files with 1076 additions and 678 deletions

View File

@ -92,14 +92,16 @@ typedef struct taosField {
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
const char *taos_data_type(int type);
typedef struct tmq_message_t tmq_message_t;
typedef struct tmq_message_topic_t tmq_message_topic_t;
typedef struct tmq_message_tb_t tmq_message_tb_t;
typedef struct tmq_tb_iter_t tmq_tb_iter_t;
typedef struct tmq_message_col_t tmq_message_col_t;
typedef struct tmq_col_iter_t tmq_col_iter_t;
typedef struct TAOS_BIND {
int buffer_type;
@ -134,6 +136,15 @@ typedef struct TAOS_MULTI_BIND {
int num;
} TAOS_MULTI_BIND;
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
@ -192,16 +203,6 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
typedef struct tmq_message_topic_t tmq_message_topic_t;
typedef struct tmq_message_tb_t tmq_message_tb_t;
typedef struct tmq_tb_iter_t tmq_tb_iter_t;
typedef struct tmq_message_col_t tmq_message_col_t;
typedef struct tmq_col_iter_t tmq_col_iter_t;
DLL_EXPORT tmq_list_t* tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*);

View File

@ -38,6 +38,12 @@
// int16_t bytes;
//} SSchema;
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef struct SColumnDataAgg {
int16_t colId;
int64_t sum;
@ -57,17 +63,12 @@ typedef struct SDataBlockInfo {
typedef struct SConstantItem {
SColumnInfo info;
int32_t startIndex; // run-length-encoding to save the space for multiple rows
int32_t endIndex;
int32_t startRow; // run-length-encoding to save the space for multiple rows
int32_t endRow;
SVariant value;
} SConstantItem;
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData>
@ -75,9 +76,12 @@ typedef struct SSDataBlock {
SDataBlockInfo info;
} SSDataBlock;
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef struct SColumnInfoData {
SColumnInfo info; // TODO filter info needs to be removed
char *pData; // the corresponding block data in memory
SColumnInfo info; // TODO filter info needs to be removed
char *nullbitmap;//
char *pData; // the corresponding block data in memory
} SColumnInfoData;
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {

View File

@ -296,6 +296,7 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
}
return buf;
}
typedef struct {
int32_t acctId;
int64_t clusterId;
@ -303,6 +304,7 @@ typedef struct {
int8_t superUser;
int8_t align[3];
SEpSet epSet;
char sVersion[128];
} SConnectRsp;
typedef struct {
@ -513,21 +515,26 @@ typedef struct {
} SAlterDbReq;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int8_t ignoreNotExists;
} SDropDbReq;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
uint64_t uid;
} SDropDbRsp;
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int32_t vgVersion;
} SUseDbReq;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
} SSyncDbReq;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
} SCompactDbReq;
typedef struct {

View File

@ -122,6 +122,7 @@ typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN];
char ver[128];
int32_t acctId;
uint32_t connId;
int32_t connType;

View File

@ -218,12 +218,10 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag,
if (pQueryNode->type == TSDB_SQL_SELECT) {
setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
tfree(pSchema);
pRequest->type = TDMT_VND_QUERY;
} else {
tfree(pSchema);
}
tfree(pSchema);
return code;
}

View File

@ -277,3 +277,70 @@ int taos_affected_rows(TAOS_RES *res) {
}
int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; }
int taos_select_db(TAOS *taos, const char *db) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return TSDB_CODE_TSC_DISCONNECTED;
}
if (db == NULL || strlen(db) == 0) {
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return terrno;
}
char sql[256] = {0};
snprintf(sql, tListLen(sql), "use %s", db);
TAOS_RES* pRequest = taos_query(taos, sql);
int32_t code = taos_errno(pRequest);
taos_free_result(pRequest);
return code;
}
void taos_stop_query(TAOS_RES *res) {
if (res == NULL) {
return;
}
SRequestObj* pRequest = (SRequestObj*) res;
int32_t numOfFields = taos_num_fields(pRequest);
// It is not a query, no need to stop.
if (numOfFields == 0) {
return;
}
// scheduleCancelJob(pRequest->body.pQueryJob);
}
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return false;
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
return 0;
}
int taos_validate_sql(TAOS *taos, const char *sql) {
return true;
}
const char *taos_get_server_info(TAOS *taos) {
if (taos == NULL) {
return NULL;
}
STscObj* pTscObj = (STscObj*) taos;
return pTscObj->ver;
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
// TODO
}
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
// TODO
}

View File

@ -68,6 +68,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->connId = pConnect->connId;
pTscObj->acctId = pConnect->acctId;
tstrncpy(pTscObj->ver, pConnect->sVersion, tListLen(pTscObj->ver));
// update the appInstInfo
pTscObj->pAppInfo->clusterId = pConnect->clusterId;

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,5 @@
#include "tep.h"
#include "common.h"
#include "tglobal.h"
#include "tlockfree.h"
@ -59,3 +60,99 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) {
return ep;
}
bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) {
if (pColAgg != NULL) {
if (pColAgg->numOfNull == totalRows) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return true;
} else if (pColAgg->numOfNull == 0) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return false;
}
}
if (pColumnInfoData->nullbitmap == NULL) {
return false;
}
uint8_t v = (pColumnInfoData->nullbitmap[row>>3] & (1<<(8 - (row&0x07))));
return (v == 1);
}
bool colDataIsNull_f(const char* bitmap, uint32_t row) {
return (bitmap[row>>3] & (1<<(8 - (row&0x07))));
}
void colDataSetNull_f(char* bitmap, uint32_t row) { // TODO
return;
}
void* colDataGet(const SColumnInfoData* pColumnInfoData, uint32_t row) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
uint32_t offset = ((uint32_t*)pColumnInfoData->pData)[row];
return (char*)(pColumnInfoData->pData) + offset; // the first part is the pointer to the true binary data
} else {
return (char*)(pColumnInfoData->pData) + (row * pColumnInfoData->info.bytes);
}
}
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull) {
ASSERT(pColumnInfoData != NULL);
if (isNull) {
// TODO set null value in the nullbitmap
return 0;
}
int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
// TODO continue append var_type
} else {
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
switch(type) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT: {*(int8_t*) p = *(int8_t*) pData;break;}
default:
assert(0);
}
}
return 0;
}
size_t colDataGetCols(const SSDataBlock* pBlock) {
ASSERT(pBlock);
size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0;
ASSERT( pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock) + constantCols);
return pBlock->info.numOfCols;
}
size_t colDataGetRows(const SSDataBlock* pBlock) {
return pBlock->info.rows;
}
int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock) {
if (pDataBlock == NULL || pDataBlock->info.rows <= 0) {
return 0;
}
if (pDataBlock->info.numOfCols <= 0) {
return -1;
}
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
return 0;
}
ASSERT(pColInfoData->nullbitmap == NULL);
pDataBlock->info.window.skey = *(TSKEY*) colDataGet(pColInfoData, 0);
pDataBlock->info.window.ekey = *(TSKEY*) colDataGet(pColInfoData, (pDataBlock->info.rows - 1));
return 0;
}

View File

@ -128,6 +128,8 @@ typedef struct {
int32_t failedTimes;
void* rpcHandle;
void* rpcAHandle;
void* rpcRsp;
int32_t rpcRspLen;
SArray* redoLogs;
SArray* undoLogs;
SArray* commitLogs;

View File

@ -36,16 +36,17 @@ typedef struct {
int32_t mndInitTrans(SMnode *pMnode);
void mndCleanupTrans(SMnode *pMnode);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq);
void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SMnodeMsg *pMsg);
void mndTransProcessRsp(SMnodeMsg *pRsp);
void mndTransPullup(SMnode *pMnode);
#ifdef __cplusplus

View File

@ -767,6 +767,14 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
int32_t rspLen = sizeof(SDropDbRsp);
SDropDbRsp *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) goto DROP_DB_OVER;
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
pRsp->uid = htobe64(pDb->uid);
mndTransSetRpcRsp(pTrans, pRsp, rspLen);
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER;
code = 0;

View File

@ -237,13 +237,11 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
if (pIter == NULL) break;
if (pObj->pDnode == NULL) break;
pEpSet->eps[pEpSet->numOfEps].port = htons(pObj->pDnode->port);
memcpy(pEpSet->eps[pEpSet->numOfEps].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
if (pObj->role == TAOS_SYNC_STATE_LEADER) {
pEpSet->inUse = pEpSet->numOfEps;
}
pEpSet->numOfEps++;
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, htons(pObj->pDnode->port));
sdbRelease(pSdb, pObj);
}
}

View File

@ -14,14 +14,15 @@
*/
#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "mndProfile.h"
#include "mndConsumer.h"
//#include "mndConsumer.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTopic.h"
//#include "mndTopic.h"
#include "mndUser.h"
#include "mndVgroup.h"
//#include "mndVgroup.h"
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
@ -230,10 +231,12 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
goto CONN_OVER;
}
pRsp->acctId = htonl(pUser->acctId);
pRsp->acctId = htonl(pUser->acctId);
pRsp->superUser = pUser->superUser;
pRsp->clusterId = htobe64(pMnode->clusterId);
pRsp->connId = htonl(pConn->id);
pRsp->connId = htonl(pConn->id);
snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo);
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
pReq->contLen = sizeof(SConnectRsp);

View File

@ -308,6 +308,11 @@ static void mndTransDropData(STrans *pTrans) {
mndTransDropLogs(pTrans->commitLogs);
mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions);
if (pTrans->rpcRsp != NULL) {
rpcFreeCont(pTrans->rpcRsp);
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
}
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
@ -339,7 +344,7 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
sdbRelease(pSdb, pTrans);
}
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) {
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
STrans *pTrans = calloc(1, sizeof(STrans));
if (pTrans == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -350,8 +355,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) {
pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy;
pTrans->rpcHandle = pMsg->handle;
pTrans->rpcAHandle = pMsg->ahandle;
pTrans->rpcHandle = pReq->handle;
pTrans->rpcAHandle = pReq->ahandle;
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
@ -436,6 +441,11 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
return mndTransAppendAction(pTrans->undoActions, pAction);
}
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) {
pTrans->rpcRsp = pCont;
pTrans->rpcRspLen = contLen;
}
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) {
@ -479,6 +489,11 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pNew->rpcHandle = pTrans->rpcHandle;
pNew->rpcAHandle = pTrans->rpcAHandle;
pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen;
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
mndTransExecute(pMnode, pNew);
mndReleaseTrans(pMnode, pNew);
return 0;
@ -529,15 +544,21 @@ static void mndTransSendRpcRsp(STrans *pTrans) {
if (sendRsp && pTrans->rpcHandle != NULL) {
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
pTrans->rpcAHandle);
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle};
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle,
.code = pTrans->code,
.ahandle = pTrans->rpcAHandle,
.pCont = pTrans->rpcRsp,
.contLen = pTrans->rpcRspLen};
rpcSendResponse(&rspMsg);
pTrans->rpcHandle = NULL;
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
}
}
void mndTransProcessRsp(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
int64_t signature = (int64_t)(pMsg->rpcMsg.ahandle);
void mndTransProcessRsp(SMnodeMsg *pRsp) {
SMnode *pMnode = pRsp->pMnode;
int64_t signature = (int64_t)(pRsp->rpcMsg.ahandle);
int32_t transId = (int32_t)(signature >> 32);
int32_t action = (int32_t)((signature << 32) >> 32);
@ -571,10 +592,10 @@ void mndTransProcessRsp(SMnodeMsg *pMsg) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction != NULL) {
pAction->msgReceived = 1;
pAction->errCode = pMsg->rpcMsg.code;
pAction->errCode = pRsp->rpcMsg.code;
}
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pMsg->rpcMsg.code,
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code,
pAction->acceptableCode);
mndTransExecute(pMnode, pTrans);
@ -921,7 +942,7 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
void mndTransPullup(SMnode *pMnode) {
STrans *pTrans = NULL;
void * pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);

View File

@ -202,6 +202,10 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont;
pDrop->uid = htobe64(pDrop->uid);
EXPECT_STREQ(pDrop->db, "1.d1");
}
test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, "");
@ -249,6 +253,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_EQ(test.GetShowRows(), 1);
CheckBinary("d2", TSDB_DB_NAME_LEN - 1);
uint64_t d2_uid = 0;
{
int32_t contLen = sizeof(SUseDbReq);
@ -262,6 +268,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont;
EXPECT_STREQ(pRsp->db, "1.d2");
pRsp->uid = htobe64(pRsp->uid);
d2_uid = pRsp->uid;
pRsp->vgVersion = htonl(pRsp->vgVersion);
pRsp->vgNum = htonl(pRsp->vgNum);
pRsp->hashMethod = pRsp->hashMethod;
@ -311,5 +319,10 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont;
pDrop->uid = htobe64(pDrop->uid);
EXPECT_STREQ(pDrop->db, "1.d2");
EXPECT_EQ(pDrop->uid, d2_uid);
}
}

View File

@ -1125,8 +1125,6 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
*str = cJSON_Print(json);
cJSON_Delete(json);
// printf("====Physical plan:====\n");
// printf("%s\n", *str);
*len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS;
}

View File

@ -40,7 +40,8 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
tfree(pDag);
}
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, uint64_t requestId) {
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList,
uint64_t requestId) {
SQueryPlanNode* pLogicPlan;
int32_t code = createQueryPlan(pNode, &pLogicPlan);
if (TSDB_CODE_SUCCESS != code) {
@ -49,9 +50,10 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
}
if (pLogicPlan->info.type != QNODE_MODIFY) {
// char* str = NULL;
// queryPlanToString(pLogicPlan, &str);
// printf("%s\n", str);
char* str = NULL;
queryPlanToString(pLogicPlan, &str);
qDebug("reqId:0x%"PRIx64": %s", requestId, str);
tfree(str);
}
code = optimizeQueryPlan(pLogicPlan);

View File

@ -1194,17 +1194,18 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
code = atomic_load_32(&pJob->errCode);
SCH_ERR_RET(code);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SSubplan *plan = pTask->plan;
if (NULL == pTask->msg) {
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) {
SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen);
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen);
SCH_ERR_JRET(code);
} else {
SCH_TASK_DLOG(" ===physical plan=== len:%d, %s", pTask->msgLen, pTask->msg);
}
}
@ -1218,13 +1219,10 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
return TSDB_CODE_SUCCESS;
_return:
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code));
SCH_RET(code);
}

View File

@ -45,13 +45,13 @@ typedef struct SCliThrdObj {
pthread_t thread;
uv_loop_t* loop;
uv_async_t* cliAsync; //
uv_timer_t* pTimer;
uv_timer_t* timer;
void* pool; // conn pool
queue msg;
pthread_mutex_t msgMtx;
uint64_t nextTimeout; // next timeout
void* pTransInst; //
bool quit;
} SCliThrdObj;
typedef struct SClientObj {
@ -94,6 +94,8 @@ static void clientHandleResp(SCliConn* conn);
static void clientHandleExcept(SCliConn* conn);
// handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd);
static void destroyUserdata(SRpcMsg* userdata);
@ -136,8 +138,8 @@ static void clientHandleResp(SCliConn* conn) {
destroyCmsg(pMsg);
conn->data = NULL;
// start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
}
static void clientHandleExcept(SCliConn* pConn) {
@ -155,7 +157,7 @@ static void clientHandleExcept(SCliConn* pConn) {
SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = -1;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
pConn->notifyCount += 1;
@ -332,9 +334,8 @@ static void clientWriteCb(uv_write_t* req, int status) {
tDebug("conn %p data already was written out", pConn);
SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) {
destroy
// handle
return;
// handle
return;
}
destroyUserdata(&pMsg->msg);
} else {
@ -375,6 +376,15 @@ static void clientConnCb(uv_connect_t* req, int status) {
clientWrite(pConn);
}
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("thread %p start to quit", pThrd);
destroyCmsg(pMsg);
uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
uv_timer_stop(pThrd->timer);
pThrd->quit = true;
// uv__async_stop(pThrd->cliAsync);
uv_stop(pThrd->loop);
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st;
@ -389,7 +399,13 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->data = pMsg;
conn->writeReq->data = conn;
transDestroyBuffer(&conn->readBuf);
if (pThrd->quit) {
clientHandleExcept(conn);
return;
}
clientWrite(conn);
} else {
SCliConn* conn = calloc(1, sizeof(SCliConn));
conn->ref++;
@ -430,7 +446,12 @@ static void clientAsyncCb(uv_async_t* handle) {
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
clientHandleReq(pMsg, pThrd);
if (pMsg->ctx == NULL) {
clientHandleQuit(pMsg, pThrd);
} else {
clientHandleReq(pMsg, pThrd);
}
// clientHandleReq(pMsg, pThrd);
count++;
}
if (count >= 2) {
@ -458,7 +479,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
if (err == 0) {
tDebug("sucess to create tranport-client thread %d", i);
tDebug("success to create tranport-client thread %d", i);
}
cli->pThreadObj[i] = pThrd;
}
@ -492,20 +513,24 @@ static SCliThrdObj* createThrdObj() {
uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
pThrd->cliAsync->data = pThrd;
pThrd->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->pTimer);
pThrd->pTimer->data = pThrd;
pThrd->timer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->timer);
pThrd->timer->data = pThrd;
pThrd->pool = creatConnPool(1);
pThrd->quit = false;
return pThrd;
}
static void destroyThrdObj(SCliThrdObj* pThrd) {
if (pThrd == NULL) {
return;
}
uv_stop(pThrd->loop);
pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->cliAsync);
free(pThrd->timer);
free(pThrd->loop);
free(pThrd);
}
@ -517,10 +542,22 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
free(ctx);
}
//
static void clientSendQuit(SCliThrdObj* thrd) {
// cli can stop gracefully
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
msg->ctx = NULL; //
pthread_mutex_lock(&thrd->msgMtx);
QUEUE_PUSH(&thrd->msg, &msg->q);
pthread_mutex_unlock(&thrd->msgMtx);
uv_async_send(thrd->cliAsync);
}
void taosCloseClient(void* arg) {
// impl later
SClientObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) {
clientSendQuit(cli->pThreadObj[i]);
destroyThrdObj(cli->pThreadObj[i]);
}
free(cli->pThreadObj);

View File

@ -70,6 +70,7 @@ typedef struct SServerObj {
uv_pipe_t** pipe;
uint32_t ip;
uint32_t port;
uv_async_t* pAcceptAsync; // just to quit from from accept thread
} SServerObj;
static const char* notify = "a";
@ -88,9 +89,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status);
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
static void destroySmsg(SSrvMsg* smsg);
// check whether already read complete packet
static bool readComplete(SConnBuffer* buf);
@ -389,7 +392,13 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError("except occurred, continue");
continue;
}
uvStartSendResp(msg);
if (msg->pConn == NULL) {
//
free(msg);
uv_stop(pThrd->loop);
} else {
uvStartSendResp(msg);
}
// uv_buf_t wb;
// uvPrepareSendData(msg, &wb);
// uv_timer_stop(conn->pTimer);
@ -397,6 +406,10 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
}
}
static void uvAcceptAsyncCb(uv_async_t* async) {
SServerObj* srv = async->data;
uv_stop(srv->loop);
}
void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (status == -1) {
@ -517,8 +530,12 @@ static bool addHandleToAcceptloop(void* arg) {
return false;
}
struct sockaddr_in bind_addr;
// register an async here to quit server gracefully
srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
srv->pAcceptAsync->data = srv;
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
tError("failed to bind: %s", uv_err_name(err));
@ -647,21 +664,42 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
return;
}
pthread_join(pThrd->thread, NULL);
// free(srv->pipe[i]);
free(pThrd->loop);
pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->workerAsync);
free(pThrd);
}
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("send quit msg to work thread");
uv_async_send(pThrd->workerAsync);
}
void taosCloseServer(void* arg) {
// impl later
SServerObj* srv = arg;
for (int i = 0; i < srv->numOfThreads; i++) {
sendQuitToWorkThrd(srv->pThreadObj[i]);
destroyWorkThrd(srv->pThreadObj[i]);
}
free(srv->loop);
free(srv->pipe);
free(srv->pThreadObj);
tDebug("send quit msg to accept thread");
uv_async_send(srv->pAcceptAsync);
pthread_join(srv->thread, NULL);
free(srv->pThreadObj);
free(srv->pAcceptAsync);
free(srv->loop);
for (int i = 0; i < srv->numOfThreads; i++) {
free(srv->pipe[i]);
}
free(srv->pipe);
free(srv);
}

View File

@ -1,6 +1,12 @@
add_executable(transportTest "")
add_executable(client "")
add_executable(server "")
add_executable(transUT "")
target_sources(transUT
PRIVATE
"transUT.cc"
)
target_sources(transportTest
PRIVATE
@ -28,6 +34,13 @@ target_link_libraries (transportTest
gtest_main
transport
)
target_link_libraries (transUT
os
util
common
gtest_main
transport
)
target_include_directories(client
PUBLIC
@ -48,6 +61,13 @@ target_include_directories(server
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(transUT
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (server
os
util

View File

@ -0,0 +1,82 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cstdio>
#include <cstring>
#include "trpc.h"
using namespace std;
class TransObj {
public:
TransObj() {
const char *label = "APP";
const char *secret = "secret";
const char *user = "user";
const char *ckey = "ckey";
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = (char *)label;
rpcInit.numOfThreads = 5;
rpcInit.cfp = NULL;
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = (char *)user;
rpcInit.secret = (char *)secret;
rpcInit.ckey = (char *)ckey;
rpcInit.spi = 1;
}
bool startCli() {
trans = NULL;
rpcInit.connType = TAOS_CONN_CLIENT;
trans = rpcOpen(&rpcInit);
return trans != NULL ? true : false;
}
bool startSrv() {
trans = NULL;
rpcInit.connType = TAOS_CONN_SERVER;
trans = rpcOpen(&rpcInit);
return trans != NULL ? true : false;
}
bool stop() {
rpcClose(trans);
trans = NULL;
return true;
}
private:
void * trans;
SRpcInit rpcInit;
};
class TransEnv : public ::testing::Test {
protected:
virtual void SetUp() {
// set up trans obj
tr = new TransObj();
}
virtual void TearDown() {
// tear down
delete tr;
}
TransObj *tr = NULL;
};
TEST_F(TransEnv, test_start_stop) {
assert(tr->startCli());
assert(tr->stop());
assert(tr->startSrv());
assert(tr->stop());
}

View File

@ -360,6 +360,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
}
} else {
int num_rows_affacted = taos_affected_rows(pSql);
taos_free_result(pSql);
et = taosGetTimestampUs();
printf("Query OK, %d of %d row(s) in database (%.6fs)\n", num_rows_affacted, num_rows_affacted, (et - st) / 1E6);
}