Merge branch '3.0' into cpwu/3.0

This commit is contained in:
cpwu 2022-05-09 12:33:08 +08:00
commit c7b8a5fbe8
33 changed files with 1231 additions and 216 deletions

View File

@ -23,7 +23,6 @@ extern "C" {
#define TDENGINE_SYSTABLE_H #define TDENGINE_SYSTABLE_H
#define TSDB_INFORMATION_SCHEMA_DB "information_schema" #define TSDB_INFORMATION_SCHEMA_DB "information_schema"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_INS_TABLE_DNODES "dnodes" #define TSDB_INS_TABLE_DNODES "dnodes"
#define TSDB_INS_TABLE_MNODES "mnodes" #define TSDB_INS_TABLE_MNODES "mnodes"
#define TSDB_INS_TABLE_MODULES "modules" #define TSDB_INS_TABLE_MODULES "modules"
@ -44,27 +43,27 @@ extern "C" {
#define TSDB_INS_TABLE_VNODES "vnodes" #define TSDB_INS_TABLE_VNODES "vnodes"
#define TSDB_INS_TABLE_CONFIGS "configs" #define TSDB_INS_TABLE_CONFIGS "configs"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "smas" #define TSDB_PERFS_TABLE_SMAS "smas"
#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" #define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes"
#define TSDB_PERFS_TABLE_CONNECTIONS "connections" #define TSDB_PERFS_TABLE_CONNECTIONS "connections"
#define TSDB_PERFS_TABLE_QUERIES "queries" #define TSDB_PERFS_TABLE_QUERIES "queries"
#define TSDB_PERFS_TABLE_TOPICS "topics" #define TSDB_PERFS_TABLE_TOPICS "topics"
#define TSDB_PERFS_TABLE_CONSUMERS "consumers" #define TSDB_PERFS_TABLE_CONSUMERS "consumers"
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" #define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
#define TSDB_PERFS_TABLE_OFFSETS "offsets" #define TSDB_PERFS_TABLE_OFFSETS "offsets"
#define TSDB_PERFS_TABLE_TRANS "trans" #define TSDB_PERFS_TABLE_TRANS "trans"
#define TSDB_PERFS_TABLE_STREAMS "streams" #define TSDB_PERFS_TABLE_STREAMS "streams"
typedef struct SSysDbTableSchema { typedef struct SSysDbTableSchema {
const char *name; const char* name;
const int32_t type; const int32_t type;
const int32_t bytes; const int32_t bytes;
} SSysDbTableSchema; } SSysDbTableSchema;
typedef struct SSysTableMeta { typedef struct SSysTableMeta {
const char *name; const char* name;
const SSysDbTableSchema *schema; const SSysDbTableSchema* schema;
const int32_t colNum; const int32_t colNum;
} SSysTableMeta; } SSysTableMeta;

View File

@ -17,8 +17,8 @@
#define _TD_COMMON_GLOBAL_H_ #define _TD_COMMON_GLOBAL_H_
#include "tarray.h" #include "tarray.h"
#include "tdef.h"
#include "tconfig.h" #include "tconfig.h"
#include "tdef.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -121,15 +121,16 @@ extern char tsCompressor[];
extern int32_t tsDiskCfgNum; extern int32_t tsDiskCfgNum;
extern SDiskCfg tsDiskCfg[]; extern SDiskCfg tsDiskCfg[];
// internal // internal
extern int32_t tsTransPullupMs; extern int32_t tsTransPullupInterval;
extern int32_t tsMaRebalanceMs; extern int32_t tsMqRebalanceInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
char *apolloUrl, SArray *pArgs, bool tsc); const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
bool tsc);
void taosCleanupCfg(); void taosCleanupCfg();
void taosCfgDynamicOptions(const char *option, const char *value); void taosCfgDynamicOptions(const char *option, const char *value);
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary); void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary);

View File

@ -204,7 +204,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
// sync integration
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL)

View File

@ -411,7 +411,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909)
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A)
// sync integration
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910) #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)

View File

@ -211,6 +211,7 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "type", .bytes = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "type", .bytes = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
}; };

View File

@ -170,8 +170,8 @@ uint32_t tsCurRange = 100; // range
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
// internal // internal
int32_t tsTransPullupMs = 6000; int32_t tsTransPullupInterval = 6;
int32_t tsMaRebalanceMs = 2000; int32_t tsMqRebalanceInterval = 2;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN); tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
@ -438,6 +438,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1; if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
return 0; return 0;
} }
@ -575,6 +578,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
if (tsQueryBufferSize >= 0) { if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
} }

View File

@ -51,7 +51,7 @@ int32_t dmReadEps(SDnode *pDnode) {
pDnode->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); pDnode->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
if (pDnode->data.dnodeEps == NULL) { if (pDnode->data.dnodeEps == NULL) {
dError("failed to calloc dnodeEp array since %s", strerror(errno)); dError("failed to calloc dnodeEp array since %s", strerror(errno));
goto PRASE_DNODE_OVER; goto _OVER;
} }
snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP);
@ -59,53 +59,53 @@ int32_t dmReadEps(SDnode *pDnode) {
if (pFile == NULL) { if (pFile == NULL) {
// dDebug("file %s not exist", file); // dDebug("file %s not exist", file);
code = 0; code = 0;
goto PRASE_DNODE_OVER; goto _OVER;
} }
len = (int32_t)taosReadFile(pFile, content, maxLen); len = (int32_t)taosReadFile(pFile, content, maxLen);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s since content is null", file); dError("failed to read %s since content is null", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
content[len] = 0; content[len] = 0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
dError("failed to read %s since invalid json format", file); dError("failed to read %s since invalid json format", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) { if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s since dnodeId not found", file); dError("failed to read %s since dnodeId not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
pDnode->data.dnodeId = dnodeId->valueint; pDnode->data.dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) { if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", file); dError("failed to read %s since clusterId not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
pDnode->data.clusterId = atoll(clusterId->valuestring); pDnode->data.clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) { if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file); dError("failed to read %s since dropped not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
pDnode->data.dropped = dropped->valueint; pDnode->data.dropped = dropped->valueint;
cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
if (!dnodes || dnodes->type != cJSON_Array) { if (!dnodes || dnodes->type != cJSON_Array) {
dError("failed to read %s since dnodes not found", file); dError("failed to read %s since dnodes not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
int32_t numOfDnodes = cJSON_GetArraySize(dnodes); int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
if (numOfDnodes <= 0) { if (numOfDnodes <= 0) {
dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes); dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes);
goto PRASE_DNODE_OVER; goto _OVER;
} }
for (int32_t i = 0; i < numOfDnodes; ++i) { for (int32_t i = 0; i < numOfDnodes; ++i) {
@ -117,7 +117,7 @@ int32_t dmReadEps(SDnode *pDnode) {
cJSON *did = cJSON_GetObjectItem(node, "id"); cJSON *did = cJSON_GetObjectItem(node, "id");
if (!did || did->type != cJSON_Number) { if (!did || did->type != cJSON_Number) {
dError("failed to read %s since dnodeId not found", file); dError("failed to read %s since dnodeId not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
dnodeEp.id = did->valueint; dnodeEp.id = did->valueint;
@ -125,14 +125,14 @@ int32_t dmReadEps(SDnode *pDnode) {
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s since dnodeFqdn not found", file); dError("failed to read %s since dnodeFqdn not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
if (!dnodePort || dnodePort->type != cJSON_Number) { if (!dnodePort || dnodePort->type != cJSON_Number) {
dError("failed to read %s since dnodePort not found", file); dError("failed to read %s since dnodePort not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
dnodeEp.ep.port = dnodePort->valueint; dnodeEp.ep.port = dnodePort->valueint;
@ -140,7 +140,7 @@ int32_t dmReadEps(SDnode *pDnode) {
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
if (!isMnode || isMnode->type != cJSON_Number) { if (!isMnode || isMnode->type != cJSON_Number) {
dError("failed to read %s since isMnode not found", file); dError("failed to read %s since isMnode not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
dnodeEp.isMnode = isMnode->valueint; dnodeEp.isMnode = isMnode->valueint;
@ -151,7 +151,7 @@ int32_t dmReadEps(SDnode *pDnode) {
dDebug("succcessed to read file %s", file); dDebug("succcessed to read file %s", file);
dmPrintEps(pDnode); dmPrintEps(pDnode);
PRASE_DNODE_OVER: _OVER:
if (content != NULL) taosMemoryFree(content); if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);
@ -176,7 +176,7 @@ PRASE_DNODE_OVER:
int32_t dmWriteEps(SDnode *pDnode) { int32_t dmWriteEps(SDnode *pDnode) {
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
char realfile[PATH_MAX]; char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->wrappers[DNODE].path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->wrappers[DNODE].path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP);

View File

@ -105,12 +105,13 @@ void dmStopMonitorThread(SDnode *pDnode) {
} }
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnode * pDnode = pInfo->ahandle; SDnode *pDnode = pInfo->ahandle;
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1; int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg); dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg);
switch (pRpc->msgType) { switch (msgType) {
case TDMT_DND_CONFIG_DNODE: case TDMT_DND_CONFIG_DNODE:
code = dmProcessConfigReq(pDnode, pMsg); code = dmProcessConfigReq(pDnode, pMsg);
break; break;
@ -148,9 +149,14 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
break; break;
} }
if (pRpc->msgType & 1u) { if (msgType & 1u) {
if (code != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId}; SRpcMsg rsp = {
.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
.refId = pMsg->rpcMsg.refId,
};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
@ -160,7 +166,13 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
} }
int32_t dmStartWorker(SDnode *pDnode) { int32_t dmStartWorker(SDnode *pDnode) {
SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessMgmtQueue, .param = pDnode}; SSingleWorkerCfg cfg = {
.min = 1,
.max = 1,
.name = "dnode-mgmt",
.fp = (FItem)dmProcessMgmtQueue,
.param = pDnode,
};
if (tSingleWorkerInit(&pDnode->data.mgmtWorker, &cfg) != 0) { if (tSingleWorkerInit(&pDnode->data.mgmtWorker, &cfg) != 0) {
dError("failed to start dnode-mgmt worker since %s", terrstr()); dError("failed to start dnode-mgmt worker since %s", terrstr());
return -1; return -1;

View File

@ -18,7 +18,11 @@
static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) { static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code, .refId = pMsg->rpcMsg.refId}; .handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
.refId = pMsg->rpcMsg.refId,
};
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
@ -103,7 +107,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
} }
int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SBnodeMgmt * pMgmt = pWrapper->pMgmt; SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = &pMgmt->writeWorker; SMultiWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
@ -112,7 +116,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SBnodeMgmt * pMgmt = pWrapper->pMgmt; SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
@ -121,7 +125,12 @@ int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t bmStartWorker(SBnodeMgmt *pMgmt) { int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessWriteQueue, .param = pMgmt}; SMultiWorkerCfg cfg = {
.max = 1,
.name = "bnode-write",
.fp = (FItems)bmProcessWriteQueue,
.param = pMgmt,
};
if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start bnode-write worker since %s", terrstr()); dError("failed to start bnode-write worker since %s", terrstr());
return -1; return -1;
@ -129,7 +138,12 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
if (tsMultiProcess) { if (tsMultiProcess) {
SSingleWorkerCfg mCfg = { SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonitorQueue, .param = pMgmt}; .min = 1,
.max = 1,
.name = "bnode-monitor",
.fp = (FItem)bmProcessMonitorQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start bnode-monitor worker since %s", terrstr()); dError("failed to start bnode-monitor worker since %s", terrstr());
return -1; return -1;

View File

@ -22,7 +22,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
int32_t maxLen = 4096; int32_t maxLen = 4096;
char *content = taosMemoryCalloc(1, maxLen + 1); char *content = taosMemoryCalloc(1, maxLen + 1);
cJSON *root = NULL; cJSON *root = NULL;
char file[PATH_MAX]; char file[PATH_MAX] = {0};
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
@ -30,39 +30,39 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
if (pFile == NULL) { if (pFile == NULL) {
// dDebug("file %s not exist", file); // dDebug("file %s not exist", file);
code = 0; code = 0;
goto PRASE_MNODE_OVER; goto _OVER;
} }
len = (int32_t)taosReadFile(pFile, content, maxLen); len = (int32_t)taosReadFile(pFile, content, maxLen);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s since content is null", file); dError("failed to read %s since content is null", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
content[len] = 0; content[len] = 0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
dError("failed to read %s since invalid json format", file); dError("failed to read %s since invalid json format", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_Number) { if (!deployed || deployed->type != cJSON_Number) {
dError("failed to read %s since deployed not found", file); dError("failed to read %s since deployed not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
*pDeployed = deployed->valueint; *pDeployed = deployed->valueint;
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
if (!mnodes || mnodes->type != cJSON_Array) { if (!mnodes || mnodes->type != cJSON_Array) {
dError("failed to read %s since nodes not found", file); dError("failed to read %s since nodes not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
pMgmt->replica = cJSON_GetArraySize(mnodes); pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
goto PRASE_MNODE_OVER; goto _OVER;
} }
for (int32_t i = 0; i < pMgmt->replica; ++i) { for (int32_t i = 0; i < pMgmt->replica; ++i) {
@ -74,21 +74,21 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
cJSON *id = cJSON_GetObjectItem(node, "id"); cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) { if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", file); dError("failed to read %s since id not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
pReplica->id = id->valueint; pReplica->id = id->valueint;
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file); dError("failed to read %s since fqdn not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
cJSON *port = cJSON_GetObjectItem(node, "port"); cJSON *port = cJSON_GetObjectItem(node, "port");
if (!port || port->type != cJSON_Number) { if (!port || port->type != cJSON_Number) {
dError("failed to read %s since port not found", file); dError("failed to read %s since port not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
pReplica->port = port->valueint; pReplica->port = port->valueint;
} }
@ -96,7 +96,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
code = 0; code = 0;
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
PRASE_MNODE_OVER: _OVER:
if (content != NULL) taosMemoryFree(content); if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);

View File

@ -161,9 +161,7 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (!deployed) { if (!deployed) {
dInfo("mnode start to deploy"); dInfo("mnode start to deploy");
// if (pWrapper->procType == DND_PROC_CHILD) { pWrapper->pDnode->data.dnodeId = 1;
pWrapper->pDnode->data.dnodeId = 1;
// }
mmBuildOptionForDeploy(pMgmt, &option); mmBuildOptionForDeploy(pMgmt, &option);
} else { } else {
dInfo("mnode start to open"); dInfo("mnode start to open");

View File

@ -17,42 +17,48 @@
#include "mmInt.h" #include "mmInt.h"
static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {
.ahandle = pMsg->rpcMsg.ahandle, .handle = pMsg->rpcMsg.handle,
.refId = pMsg->rpcMsg.refId, .ahandle = pMsg->rpcMsg.ahandle,
.code = code, .refId = pMsg->rpcMsg.refId,
.pCont = pMsg->pRsp, .code = code,
.contLen = pMsg->rspLen}; .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, get from mnode queue", pMsg); dTrace("msg:%p, get from mnode queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_DND_ALTER_MNODE) { switch (msgType) {
code = mmProcessAlterReq(pMgmt, pMsg); case TDMT_DND_ALTER_MNODE:
} else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_INFO) { code = mmProcessAlterReq(pMgmt, pMsg);
code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); break;
} else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_LOAD) { case TDMT_MON_MM_INFO:
code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg); code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg);
} else { break;
pMsg->pNode = pMgmt->pMnode; case TDMT_MON_MM_LOAD:
code = mndProcessMsg(pMsg); code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg);
break;
default:
pMsg->pNode = pMgmt->pMnode;
code = mndProcessMsg(pMsg);
} }
if (pRpc->msgType & 1U) { if (msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (pMsg->rpcMsg.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code); mmSendRsp(pMsg, code);
} }
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pRpc->pCont); rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
@ -78,38 +84,38 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void mmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { static void mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
} }
int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->writeWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg);
return 0; return 0;
} }
int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->syncWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
return 0; return 0;
} }
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->readWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
return 0; return 0;
} }
int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->queryWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
return 0; return 0;
} }
int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->monitorWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
return 0; return 0;
} }
@ -144,40 +150,62 @@ int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg qCfg = {.min = tsNumOfMnodeQueryThreads, SSingleWorkerCfg qCfg = {
.max = tsNumOfMnodeQueryThreads, .min = tsNumOfMnodeQueryThreads,
.name = "mnode-query", .max = tsNumOfMnodeQueryThreads,
.fp = (FItem)mmProcessQueryQueue, .name = "mnode-query",
.param = pMgmt}; .fp = (FItem)mmProcessQueryQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
dError("failed to start mnode-query worker since %s", terrstr()); dError("failed to start mnode-query worker since %s", terrstr());
return -1; return -1;
} }
SSingleWorkerCfg rCfg = {.min = tsNumOfMnodeReadThreads, SSingleWorkerCfg rCfg = {
.max = tsNumOfMnodeReadThreads, .min = tsNumOfMnodeReadThreads,
.name = "mnode-read", .max = tsNumOfMnodeReadThreads,
.fp = (FItem)mmProcessQueue, .name = "mnode-read",
.param = pMgmt}; .fp = (FItem)mmProcessQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) { if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
dError("failed to start mnode-read worker since %s", terrstr()); dError("failed to start mnode-read worker since %s", terrstr());
return -1; return -1;
} }
SSingleWorkerCfg wCfg = {.min = 1, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt}; SSingleWorkerCfg wCfg = {
.min = 1,
.max = 1,
.name = "mnode-write",
.fp = (FItem)mmProcessQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) { if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
dError("failed to start mnode-write worker since %s", terrstr()); dError("failed to start mnode-write worker since %s", terrstr());
return -1; return -1;
} }
SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; SSingleWorkerCfg sCfg = {
.min = 1,
.max = 1,
.name = "mnode-sync",
.fp = (FItem)mmProcessQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
dError("failed to start mnode mnode-sync worker since %s", terrstr()); dError("failed to start mnode mnode-sync worker since %s", terrstr());
return -1; return -1;
} }
if (tsMultiProcess) { if (tsMultiProcess) {
SSingleWorkerCfg mCfg = {.min = 1, .max = 1, .name = "mnode-monitor", .fp = (FItem)mmProcessQueue, .param = pMgmt}; SSingleWorkerCfg mCfg = {
.min = 1,
.max = 1,
.name = "mnode-monitor",
.fp = (FItem)mmProcessQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start mnode mnode-monitor worker since %s", terrstr()); dError("failed to start mnode mnode-monitor worker since %s", terrstr());
return -1; return -1;

View File

@ -17,12 +17,14 @@
#include "qmInt.h" #include "qmInt.h"
static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {
.ahandle = pMsg->rpcMsg.ahandle, .handle = pMsg->rpcMsg.handle,
.refId = pMsg->rpcMsg.refId, .ahandle = pMsg->rpcMsg.ahandle,
.code = code, .refId = pMsg->rpcMsg.refId,
.pCont = pMsg->pRsp, .code = code,
.contLen = pMsg->rspLen}; .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
@ -145,22 +147,26 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
} }
int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
SSingleWorkerCfg queryCfg = {.min = tsNumOfVnodeQueryThreads, SSingleWorkerCfg queryCfg = {
.max = tsNumOfVnodeQueryThreads, .min = tsNumOfVnodeQueryThreads,
.name = "qnode-query", .max = tsNumOfVnodeQueryThreads,
.fp = (FItem)qmProcessQueryQueue, .name = "qnode-query",
.param = pMgmt}; .fp = (FItem)qmProcessQueryQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) {
dError("failed to start qnode-query worker since %s", terrstr()); dError("failed to start qnode-query worker since %s", terrstr());
return -1; return -1;
} }
SSingleWorkerCfg fetchCfg = {.min = tsNumOfQnodeFetchThreads, SSingleWorkerCfg fetchCfg = {
.max = tsNumOfQnodeFetchThreads, .min = tsNumOfQnodeFetchThreads,
.name = "qnode-fetch", .max = tsNumOfQnodeFetchThreads,
.fp = (FItem)qmProcessFetchQueue, .name = "qnode-fetch",
.param = pMgmt}; .fp = (FItem)qmProcessFetchQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) {
dError("failed to start qnode-fetch worker since %s", terrstr()); dError("failed to start qnode-fetch worker since %s", terrstr());
@ -169,7 +175,12 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
if (tsMultiProcess) { if (tsMultiProcess) {
SSingleWorkerCfg mCfg = { SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "qnode-monitor", .fp = (FItem)qmProcessMonitorQueue, .param = pMgmt}; .min = 1,
.max = 1,
.name = "qnode-monitor",
.fp = (FItem)qmProcessMonitorQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start qnode-monitor worker since %s", terrstr()); dError("failed to start qnode-monitor worker since %s", terrstr());
return -1; return -1;

View File

@ -17,12 +17,14 @@
#include "smInt.h" #include "smInt.h"
static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {
.ahandle = pMsg->rpcMsg.ahandle, .handle = pMsg->rpcMsg.handle,
.refId = pMsg->rpcMsg.refId, .ahandle = pMsg->rpcMsg.ahandle,
.code = code, .refId = pMsg->rpcMsg.refId,
.pCont = pMsg->pRsp, .code = code,
.contLen = pMsg->rspLen}; .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
@ -90,7 +92,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return -1; return -1;
} }
SMultiWorkerCfg cfg = {.max = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; SMultiWorkerCfg cfg = {
.max = 1,
.name = "snode-unique",
.fp = smProcessUniqueQueue,
.param = pMgmt,
};
if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) {
dError("failed to start snode-unique worker since %s", terrstr()); dError("failed to start snode-unique worker since %s", terrstr());
return -1; return -1;
@ -101,11 +108,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
} }
} }
SSingleWorkerCfg cfg = {.min = tsNumOfSnodeSharedThreads, SSingleWorkerCfg cfg = {
.max = tsNumOfSnodeSharedThreads, .min = tsNumOfSnodeSharedThreads,
.name = "snode-shared", .max = tsNumOfSnodeSharedThreads,
.fp = (FItem)smProcessSharedQueue, .name = "snode-shared",
.param = pMgmt}; .fp = (FItem)smProcessSharedQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) { if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) {
dError("failed to start snode shared-worker since %s", terrstr()); dError("failed to start snode shared-worker since %s", terrstr());
@ -114,7 +123,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
if (tsMultiProcess) { if (tsMultiProcess) {
SSingleWorkerCfg mCfg = { SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue, .param = pMgmt}; .min = 1,
.max = 1,
.name = "snode-monitor",
.fp = (FItem)smProcessMonitorQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start snode-monitor worker since %s", terrstr()); dError("failed to start snode-monitor worker since %s", terrstr());
return -1; return -1;
@ -150,7 +164,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
} }
int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt * pMgmt = pWrapper->pMgmt; SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) { if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
@ -163,7 +177,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt * pMgmt = pWrapper->pMgmt; SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
@ -172,7 +186,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt * pMgmt = pWrapper->pMgmt; SSnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) { if (pWorker == NULL) {
@ -186,7 +200,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt * pMgmt = pWrapper->pMgmt; SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->sharedWorker; SSingleWorker *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);

View File

@ -335,7 +335,6 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) {
} }
bool roleChanged = false; bool roleChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
// sync integration
if (pVgroup->vnodeGid[vg].dnodeId == statusReq.dnodeId) { if (pVgroup->vnodeGid[vg].dnodeId == statusReq.dnodeId) {
if (pVgroup->vnodeGid[vg].role != pVload->syncState) { if (pVgroup->vnodeGid[vg].role != pVload->syncState) {
roleChanged = true; roleChanged = true;

View File

@ -1405,15 +1405,18 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend(pColInfo, numOfRows, (const char *)dbname, false); colDataAppend(pColInfo, numOfRows, (const char *)dbname, false);
char type[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0}; char type[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(type, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)type, false); colDataAppend(pColInfo, numOfRows, (const char *)type, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false);
char lastError[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; char lastError[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname, pTrans->lastError, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(lastError, pTrans->lastError, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)lastError, false); colDataAppend(pColInfo, numOfRows, (const char *)lastError, false);

View File

@ -65,7 +65,7 @@ static void mndPullupTrans(void *param, void *tmrId) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer); taosTmrReset(mndPullupTrans, tsTransPullupInterval * 1000, pMnode, pMnode->timer, &pMnode->transTimer);
} }
static void mndCalMqRebalance(void *param, void *tmrId) { static void mndCalMqRebalance(void *param, void *tmrId) {
@ -81,7 +81,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer); taosTmrReset(mndCalMqRebalance, tsMqRebalanceInterval * 1000, pMnode, pMnode->timer, &pMnode->mqTimer);
} }
static void mndPullupTelem(void *param, void *tmrId) { static void mndPullupTelem(void *param, void *tmrId) {
@ -103,12 +103,12 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1; return -1;
} }
if (taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer)) { if (taosTmrReset(mndPullupTrans, tsTransPullupInterval * 1000, pMnode, pMnode->timer, &pMnode->transTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
if (taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer)) { if (taosTmrReset(mndCalMqRebalance, tsMqRebalanceInterval * 1000, pMnode, pMnode->timer, &pMnode->mqTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }

View File

@ -58,7 +58,7 @@ class MndTestTrans2 : public ::testing::Test {
strcpy(opt.replicas[0].fqdn, "localhost"); strcpy(opt.replicas[0].fqdn, "localhost");
opt.msgCb = msgCb; opt.msgCb = msgCb;
tsTransPullupMs = 1000; tsTransPullupInterval = 1;
const char *mnodepath = "/tmp/mnode_test_trans"; const char *mnodepath = "/tmp/mnode_test_trans";
taosRemoveDir(mnodepath); taosRemoveDir(mnodepath);

View File

@ -157,7 +157,7 @@ struct SVnodeCfg {
bool isWeak; bool isWeak;
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
SWalCfg walCfg; SWalCfg walCfg;
SSyncCfg syncCfg; // sync integration SSyncCfg syncCfg;
uint32_t hashBegin; uint32_t hashBegin;
uint32_t hashEnd; uint32_t hashEnd;
int8_t hashMethod; int8_t hashMethod;

View File

@ -879,14 +879,14 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
} }
} }
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow,
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, TDRowVerT maxVer) { TDRowVerT maxVer) {
STSRow *rmem = NULL, *rimem = NULL; STSRow *rmem = NULL, *rimem = NULL;
if (pCheckInfo->iter) { if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) { if (node != NULL) {
rmem = (STSRow*)SL_GET_NODE_DATA(node); rmem = (STSRow*)SL_GET_NODE_DATA(node);
#if 0 // TODO: skiplist refactor #if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rmem) > maxVer) { if (TD_ROW_VER(rmem) > maxVer) {
rmem = NULL; rmem = NULL;
} }
@ -898,7 +898,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) { if (node != NULL) {
rimem = (STSRow*)SL_GET_NODE_DATA(node); rimem = (STSRow*)SL_GET_NODE_DATA(node);
#if 0 // TODO: skiplist refactor #if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rimem) > maxVer) { if (TD_ROW_VER(rimem) > maxVer) {
rimem = NULL; rimem = NULL;
} }
@ -1677,7 +1677,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
colIdOfRow2 = tdKvRowColIdAt(row2, k); colIdOfRow2 = tdKvRowColIdAt(row2, k);
} }
if (colIdOfRow1 < colIdOfRow2) { // the most probability if (colIdOfRow1 < colIdOfRow2) { // the most probability
if (colIdOfRow1 < pColInfo->info.colId) { if (colIdOfRow1 < pColInfo->info.colId) {
++j; ++j;
continue; continue;
@ -1720,7 +1720,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
++(*curRow); ++(*curRow);
} }
++nResult; ++nResult;
} else if (update){ } else if (update) {
mergeOption = 2; mergeOption = 2;
} else { } else {
mergeOption = 0; mergeOption = 0;
@ -1741,7 +1741,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
++(*curRow); ++(*curRow);
} }
++nResult; ++nResult;
} else if(update) { } else if (update) {
mergeOption = 2; mergeOption = 2;
} else { } else {
mergeOption = 0; mergeOption = 0;
@ -2018,9 +2018,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
rv2 = TD_ROW_SVER(row2); rv2 = TD_ROW_SVER(row2);
} }
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, numOfRows +=
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
// numOfRows += 1; pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
} }
@ -2028,7 +2028,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur->win.ekey = key; cur->win.ekey = key;
cur->lastKey = key + step; cur->lastKey = key + step;
cur->mixBlock = true; cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
#if 0 #if 0
@ -2064,7 +2063,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
} }
#endif #endif
if (TD_SUPPORT_UPDATE(pCfg->update)) { if (TD_SUPPORT_UPDATE(pCfg->update)) {
doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); if (lastKeyAppend != key) {
lastKeyAppend = key;
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
if (rv1 != TD_ROW_SVER(row1)) { if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
@ -2074,10 +2077,10 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = TD_ROW_SVER(row2); rv2 = TD_ROW_SVER(row2);
} }
numOfRows +=
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// ++numOfRows;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
} }
@ -2117,15 +2120,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t qstart = 0, qend = 0; int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
lastKeyAppend = tsArray[qend];
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend); if ((lastKeyAppend != TSKEY_INITIAL_VAL) &&
(lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) {
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
pos += (qend - qstart + 1) * step; pos += (qend - qstart + 1) * step;
if(numOfRows > 0) { if (numOfRows > 0) {
curRow = numOfRows - 1; curRow = numOfRows - 1;
} }
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart]; cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
lastKeyAppend = cur->win.ekey;
} }
} while (numOfRows < pTsdbReadHandle->outputCapacity); } while (numOfRows < pTsdbReadHandle->outputCapacity);
@ -2429,7 +2437,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb); STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
STimeWindow win = TSWINDOW_INITIALIZER; STimeWindow win = TSWINDOW_INITIALIZER;
while (true) { while (true) {
tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
@ -2735,7 +2743,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
TSKEY lastRowKey = TSKEY_INITIAL_VAL; TSKEY lastRowKey = TSKEY_INITIAL_VAL;
do { do {
STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX); STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX);
if (row == NULL) { if (row == NULL) {
@ -2760,8 +2767,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
rv = TD_ROW_SVER(row); rv = TD_ROW_SVER(row);
} }
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
NULL, pCfg->update, &lastRowKey); pSchema, NULL, pCfg->update, &lastRowKey);
if (numOfRows >= maxRowsToRead) { if (numOfRows >= maxRowsToRead) {
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
@ -2770,7 +2777,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} while (moveToNextRowInMem(pCheckInfo)); } while (moveToNextRowInMem(pCheckInfo));
taosMemoryFreeClear(pSchema); // free the STSChema taosMemoryFreeClear(pSchema); // free the STSChema
assert(numOfRows <= maxRowsToRead); assert(numOfRows <= maxRowsToRead);
@ -2898,8 +2905,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
// if (ret != TSDB_CODE_SUCCESS) { // if (ret != TSDB_CODE_SUCCESS) {
// return false; // return false;
// } // }
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, pCheckInfo->tableId, mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols,
NULL, NULL, true, &lastRowKey); pCheckInfo->tableId, NULL, NULL, true, &lastRowKey);
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
// update the last key value // update the last key value
@ -3468,7 +3475,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
pDataBlockInfo->rows = cur->rows; pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win; pDataBlockInfo->window = cur->win;
// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); // ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
} }
/* /*

View File

@ -97,7 +97,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
// sync integration
if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;
SJson *pNodeInfoArr = tjsonCreateArray(); SJson *pNodeInfoArr = tjsonCreateArray();
@ -157,7 +156,6 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
// sync integration
if (tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; if (tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
if (tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; if (tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;

View File

@ -124,8 +124,7 @@ _exit:
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = TD_VID(pVnode); pLoad->vgId = TD_VID(pVnode);
// pLoad->syncState = TAOS_SYNC_STATE_LEADER; pLoad->syncState = syncGetMyRole(pVnode->sync);
pLoad->syncState = syncGetMyRole(pVnode->sync); // sync integration
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
pLoad->numOfTimeSeries = 400; pLoad->numOfTimeSeries = 400;
pLoad->totalStorage = 300; pLoad->totalStorage = 300;

View File

@ -198,7 +198,6 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data); tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data);
} }
// sync integration
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (syncEnvIsStart()) { if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);

View File

@ -14,12 +14,6 @@
*/ */
#include "vnd.h" #include "vnd.h"
// #include "sync.h"
// #include "syncTools.h"
// #include "tmsgcb.h"
// #include "vnodeInt.h"
// sync integration
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
SSyncInfo syncInfo; SSyncInfo syncInfo;

View File

@ -1259,7 +1259,9 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
return false; return false;
} }
UdfcFuncHandle handle; UdfcFuncHandle handle;
if (setupUdf((char*)pCtx->udfName, &handle) != 0) { int32_t udfCode = 0;
if ((udfCode = setupUdf((char*)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step setupUdf. udf code: %d", udfCode);
return false; return false;
} }
SClientUdfUvSession *session = (SClientUdfUvSession *)handle; SClientUdfUvSession *session = (SClientUdfUvSession *)handle;
@ -1272,7 +1274,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
udfRes->session = (SClientUdfUvSession *)handle; udfRes->session = (SClientUdfUvSession *)handle;
SUdfInterBuf buf = {0}; SUdfInterBuf buf = {0};
if (callUdfAggInit(handle, &buf) != 0) { if ((udfCode = callUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step callUdfAggInit. udf code: %d", udfCode);
return false; return false;
} }
udfRes->interResNum = buf.numOfResult; udfRes->interResNum = buf.numOfResult;
@ -1316,21 +1319,23 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
.numOfResult = udfRes->interResNum}; .numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0}; SUdfInterBuf newState = {0};
callUdfAggProcess(session, inputBlock, &state, &newState); int32_t udfCode = callUdfAggProcess(session, inputBlock, &state, &newState);
if (udfCode != 0) {
udfRes->interResNum = newState.numOfResult; fnError("udfAggProcess error. code: %d", udfCode);
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); newState.numOfResult = 0;
} else {
udfRes->interResNum = newState.numOfResult;
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
}
if (newState.numOfResult == 1 || state.numOfResult == 1) { if (newState.numOfResult == 1 || state.numOfResult == 1) {
GET_RES_INFO(pCtx)->numOfRes = 1; GET_RES_INFO(pCtx)->numOfRes = 1;
} }
blockDataDestroy(inputBlock); blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock); taosArrayDestroy(tempBlock.pDataBlock);
taosMemoryFree(newState.buf); taosMemoryFree(newState.buf);
return 0; return TSDB_CODE_SUCCESS;
} }
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
@ -1344,15 +1349,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SUdfInterBuf state = {.buf = udfRes->interResBuf, SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize, .bufLen = session->bufSize,
.numOfResult = udfRes->interResNum}; .numOfResult = udfRes->interResNum};
callUdfAggFinalize(session, &state, &resultBuf); int32_t udfCallCode= 0;
udfCallCode= callUdfAggFinalize(session, &state, &resultBuf);
udfRes->finalResBuf = resultBuf.buf; if (udfCallCode!= 0) {
udfRes->finalResNum = resultBuf.numOfResult; fnError("udfAggFinalize error. callUdfAggFinalize step. udf code:%d", udfCallCode);
GET_RES_INFO(pCtx)->numOfRes = 0;
teardownUdf(session); } else {
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
if (resultBuf.numOfResult == 1) { udfRes->finalResNum = resultBuf.numOfResult;
GET_RES_INFO(pCtx)->numOfRes = 1; GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
} }
int32_t code = teardownUdf(session);
if (code != 0) {
fnError("udfAggFinalize error. teardownUdf step. udf code: %d", code);
}
return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
} }

View File

@ -347,10 +347,21 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
if (fmIsUserDefinedFunc(node->funcId)) { if (fmIsUserDefinedFunc(node->funcId)) {
UdfcFuncHandle udfHandle = NULL; UdfcFuncHandle udfHandle = NULL;
SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle)); code = setupUdf(node->functionName, &udfHandle);
if (code != 0) {
sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code);
goto _return;
}
code = callUdfScalarFunc(udfHandle, params, paramNum, output); code = callUdfScalarFunc(udfHandle, params, paramNum, output);
teardownUdf(udfHandle); if (code != 0) {
SCL_ERR_JRET(code); sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return;
}
code = teardownUdf(udfHandle);
if (code != 0) {
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
goto _return;
}
} else { } else {
SScalarFuncExecFuncs ffpSet = {0}; SScalarFuncExecFuncs ffpSet = {0};
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);

View File

@ -62,6 +62,8 @@
# ---- tstream # ---- tstream
./test.sh -f tsim/tstream/basic0.sim ./test.sh -f tsim/tstream/basic0.sim
# ---- transaction
./test.sh -f tsim/trans/create_db.sim
# ---- tmq # ---- tmq
./test.sh -f tsim/tmq/basic1.sim ./test.sh -f tsim/tmq/basic1.sim

View File

@ -0,0 +1,166 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect
print =============== show dnodes
sql show dnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
sql show mnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data02 != LEADER then
return -1
endi
print =============== create dnodes
sql create dnode $hostname port 7200
sleep 2000
sql show dnodes;
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
print =============== kill dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGINT
print =============== create database
sql show transactions
if $rows != 0 then
return -1
endi
sql_error create database d1 vgroups 2;
print =============== show transactions
sql show transactions
if $rows != 1 then
return -1
endi
if $data[0][0] != 2 then
return -1
endi
if $data[0][2] != undoAction then
return -1
endi
if $data[0][3] != d1 then
return -1
endi
if $data[0][4] != create-db then
return -1
endi
if $data[0][7] != @Unable to establish connection@ then
return -1
endi
sql_error create database d1 vgroups 2;
print =============== start dnode2
system sh/exec.sh -n dnode2 -s start
sleep 3000
sql show transactions
if $rows != 0 then
return -1
endi
sql create database d1 vgroups 2;
print =============== kill dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGINT
print =============== create database
sql show transactions
if $rows != 0 then
return -1
endi
sql_error create database d2 vgroups 2;
print =============== show transactions
sql show transactions
if $rows != 1 then
return -1
endi
if $data[0][0] != 4 then
return -1
endi
if $data[0][2] != undoAction then
return -1
endi
if $data[0][3] != d2 then
return -1
endi
if $data[0][4] != create-db then
return -1
endi
if $data[0][7] != @Unable to establish connection@ then
return -1
endi
sql_error create database d2 vgroups 2;
print =============== kill transaction
sql kill transaction 4;
sleep 2000
sql show transactions
if $rows != 0 then
return -1
endi
print =============== start dnode2
system sh/exec.sh -n dnode2 -s start
sleep 3000
sql show transactions
if $rows != 0 then
return -1
endi
sql create database d2 vgroups 2;
sql_error kill transaction 1;
sql_error kill transaction 2;
sql_error kill transaction 3;
sql_error kill transaction 4;
sql_error kill transaction 5;
return
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT

View File

@ -0,0 +1,311 @@
import taos
import sys
import time
import socket
import pexpect
import os
import http.server
import gzip
import threading
import json
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
telemetryPort = '6043'
def telemetryInfoCheck(infoDict=''):
hostname = socket.gethostname()
serverPort = 7080
if "ts" not in infoDict or len(infoDict["ts"]) == 0:
tdLog.exit("ts is null!")
if "dnode_id" not in infoDict or infoDict["dnode_id"] != 1:
tdLog.exit("dnode_id is null!")
if "dnode_ep" not in infoDict:
tdLog.exit("dnode_ep is null!")
if "cluster_id" not in infoDict:
tdLog.exit("cluster_id is null!")
if "protocol" not in infoDict or infoDict["protocol"] != 1:
tdLog.exit("protocol is null!")
if "cluster_info" not in infoDict :
tdLog.exit("cluster_info is null!")
# cluster_info ====================================
if "first_ep" not in infoDict["cluster_info"] or infoDict["cluster_info"]["first_ep"] == None:
tdLog.exit("first_ep is null!")
if "first_ep_dnode_id" not in infoDict["cluster_info"] or infoDict["cluster_info"]["first_ep_dnode_id"] != 1:
tdLog.exit("first_ep_dnode_id is null!")
if "version" not in infoDict["cluster_info"] or infoDict["cluster_info"]["version"] == None:
tdLog.exit("first_ep_dnode_id is null!")
if "master_uptime" not in infoDict["cluster_info"] or infoDict["cluster_info"]["master_uptime"] == None:
tdLog.exit("master_uptime is null!")
if "monitor_interval" not in infoDict["cluster_info"] or infoDict["cluster_info"]["monitor_interval"] !=5:
tdLog.exit("monitor_interval is null!")
if "vgroups_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_total"] < 0:
tdLog.exit("vgroups_total is null!")
if "vgroups_alive" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_alive"] < 0:
tdLog.exit("vgroups_alive is null!")
if "connections_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["connections_total"] < 0 :
tdLog.exit("connections_total is null!")
if "dnodes" not in infoDict["cluster_info"] or infoDict["cluster_info"]["dnodes"] == None :
tdLog.exit("dnodes is null!")
dnodes_info = { "dnode_id": 1,"dnode_ep": f"{hostname}:{serverPort}","status":"ready"}
for k ,v in dnodes_info.items():
if k not in infoDict["cluster_info"]["dnodes"][0] or v != infoDict["cluster_info"]["dnodes"][0][k] :
tdLog.exit("dnodes info is null!")
mnodes_info = { "mnode_id":1, "mnode_ep":f"{hostname}:{serverPort}","role": "LEADER" }
for k ,v in mnodes_info.items():
if k not in infoDict["cluster_info"]["mnodes"][0] or v != infoDict["cluster_info"]["mnodes"][0][k] :
tdLog.exit("mnodes info is null!")
# vgroup_infos ====================================
if "vgroup_infos" not in infoDict or infoDict["vgroup_infos"]== None:
tdLog.exit("vgroup_infos is null!")
vgroup_infos_nums = len(infoDict["vgroup_infos"])
for index in range(vgroup_infos_nums):
if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]<0:
tdLog.exit("vgroup_id is null!")
if "database_name" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["database_name"]) < 0:
tdLog.exit("database_name is null!")
if "tables_num" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["tables_num"]!= 0:
tdLog.exit("tables_num is null!")
if "status" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["status"]) < 0 :
tdLog.exit("status is null!")
if "vnodes" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vnodes"] ==None :
tdLog.exit("vnodes is null!")
if "dnode_id" not in infoDict["vgroup_infos"][index]["vnodes"][0] or infoDict["vgroup_infos"][index]["vnodes"][0]["dnode_id"] < 0 :
tdLog.exit("vnodes is null!")
# grant_info ====================================
if "grant_info" not in infoDict or infoDict["grant_info"]== None:
tdLog.exit("grant_info is null!")
if "expire_time" not in infoDict["grant_info"] or not infoDict["grant_info"]["expire_time"] > 0:
tdLog.exit("expire_time is null!")
if "timeseries_used" not in infoDict["grant_info"] or not infoDict["grant_info"]["timeseries_used"] > 0:
tdLog.exit("timeseries_used is null!")
if "timeseries_total" not in infoDict["grant_info"] or not infoDict["grant_info"]["timeseries_total"] > 0:
tdLog.exit("timeseries_total is null!")
# dnode_info ====================================
if "dnode_info" not in infoDict or infoDict["dnode_info"]== None:
tdLog.exit("dnode_info is null!")
dnode_infos = ['uptime', 'cpu_engine', 'cpu_system', 'cpu_cores', 'mem_engine', 'mem_system', 'mem_total', 'disk_engine',
'disk_used', 'disk_total', 'net_in', 'net_out', 'io_read', 'io_write', 'io_read_disk', 'io_write_disk', 'req_select',
'req_select_rate', 'req_insert', 'req_insert_success', 'req_insert_rate', 'req_insert_batch', 'req_insert_batch_success',
'req_insert_batch_rate', 'errors', 'vnodes_num', 'masters', 'has_mnode', 'has_qnode', 'has_snode', 'has_bnode']
for elem in dnode_infos:
if elem not in infoDict["dnode_info"] or infoDict["dnode_info"][elem] < 0:
tdLog.exit(f"{elem} is null!")
# dnode_info ====================================
if "disk_infos" not in infoDict or infoDict["disk_infos"]== None:
tdLog.exit("disk_infos is null!")
# bug for data_dir
if "datadir" not in infoDict["disk_infos"] or len(infoDict["disk_infos"]["datadir"]) <=0 :
tdLog.exit("datadir is null!")
if "name" not in infoDict["disk_infos"]["datadir"][0] or len(infoDict["disk_infos"]["datadir"][0]["name"]) <= 0:
tdLog.exit("name is null!")
if "level" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["level"] < 0:
tdLog.exit("level is null!")
if "avail" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["avail"] <= 0:
tdLog.exit("avail is null!")
if "used" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["used"] <= 0:
tdLog.exit("used is null!")
if "total" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["total"] <= 0:
tdLog.exit("total is null!")
if "logdir" not in infoDict["disk_infos"] or infoDict["disk_infos"]["logdir"]== None:
tdLog.exit("logdir is null!")
if "name" not in infoDict["disk_infos"]["logdir"] or len(infoDict["disk_infos"]["logdir"]["name"]) <= 0:
tdLog.exit("name is null!")
if "avail" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["avail"] <= 0:
tdLog.exit("avail is null!")
if "used" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["used"] <= 0:
tdLog.exit("used is null!")
if "total" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["total"] <= 0:
tdLog.exit("total is null!")
if "tempdir" not in infoDict["disk_infos"] or infoDict["disk_infos"]["tempdir"]== None:
tdLog.exit("tempdir is null!")
if "name" not in infoDict["disk_infos"]["tempdir"] or len(infoDict["disk_infos"]["tempdir"]["name"]) <= 0:
tdLog.exit("name is null!")
if "avail" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["avail"] <= 0:
tdLog.exit("avail is null!")
if "used" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["used"] <= 0:
tdLog.exit("used is null!")
if "total" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["total"] <= 0:
tdLog.exit("total is null!")
# log_infos ====================================
if "log_infos" not in infoDict or infoDict["log_infos"]== None:
tdLog.exit("log_infos is null!")
if "logs" not in infoDict["log_infos"] or len(infoDict["log_infos"]["logs"])!= 10:
tdLog.exit("logs is null!")
if "ts" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 10:
tdLog.exit("ts is null!")
if "level" not in infoDict["log_infos"]["logs"][0] or infoDict["log_infos"]["logs"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]:
tdLog.exit("level is null!")
if "content" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 1:
tdLog.exit("content is null!")
if "summary" not in infoDict["log_infos"] or len(infoDict["log_infos"]["summary"])!= 4:
tdLog.exit("summary is null!")
if "total" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["total"] < 0 :
tdLog.exit("total is null!")
if "level" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]:
tdLog.exit("level is null!")
class RequestHandlerImpl(http.server.BaseHTTPRequestHandler):
def do_GET(self):
"""
process GET request
"""
def do_POST(self):
"""
process POST request
"""
contentEncoding = self.headers["Content-Encoding"]
if contentEncoding == 'gzip':
req_body = self.rfile.read(int(self.headers["Content-Length"]))
plainText = gzip.decompress(req_body).decode()
else:
plainText = self.rfile.read(int(self.headers["Content-Length"])).decode()
print(plainText)
# 1. send response code and header
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
# 2. send response content
#self.wfile.write(("Hello World: " + req_body + "\n").encode("utf-8"))
# 3. check request body info
infoDict = json.loads(plainText)
#print("================")
# print(infoDict)
telemetryInfoCheck(infoDict)
# 4. shutdown the server and exit case
assassin = threading.Thread(target=httpServer.shutdown)
assassin.daemon = True
assassin.start()
print ("==== shutdown http server ====")
class TDTestCase:
hostname = socket.gethostname()
serverPort = '7080'
rpcDebugFlagVal = '143'
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
clientCfgDict["serverPort"] = serverPort
clientCfgDict["firstEp"] = hostname + ':' + serverPort
clientCfgDict["secondEp"] = hostname + ':' + serverPort
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
clientCfgDict["fqdn"] = hostname
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
updatecfgDict["clientCfg"] = clientCfgDict
updatecfgDict["serverPort"] = serverPort
updatecfgDict["firstEp"] = hostname + ':' + serverPort
updatecfgDict["secondEp"] = hostname + ':' + serverPort
updatecfgDict["fqdn"] = hostname
updatecfgDict["monitorFqdn"] = hostname
updatecfgDict["monitorPort"] = '6043'
updatecfgDict["monitor"] = '1'
updatecfgDict["monitorInterval"] = "5"
updatecfgDict["monitorMaxLogs"] = "10"
updatecfgDict["monitorComp"] = "1"
print ("===================: ", updatecfgDict)
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
# time.sleep(2)
vgroups = "30"
sql = "create database db3 vgroups " + vgroups
tdSql.query(sql)
# loop to wait request
httpServer.serve_forever()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
# create http server: bing ip/port , and request processor
serverAddress = ("", int(telemetryPort))
httpServer = http.server.HTTPServer(serverAddress, RequestHandlerImpl)
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -12,6 +12,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys import sys
import os
import threading import threading
import multiprocessing as mp import multiprocessing as mp
from numpy.lib.function_base import insert from numpy.lib.function_base import insert
@ -66,14 +67,19 @@ class TDTestCase:
# run case # run case
def run(self): def run(self):
# test base case # # test base case
self.test_case1() # self.test_case1()
tdLog.debug(" LIMIT test_case1 ............ [OK]") # tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test advance case # test case
# self.test_case2() # self.test_case2()
# tdLog.debug(" LIMIT test_case2 ............ [OK]") # tdLog.debug(" LIMIT test_case2 ............ [OK]")
# test case
self.test_case3()
tdLog.debug(" LIMIT test_case3 ............ [OK]")
# stop # stop
def stop(self): def stop(self):
tdSql.close() tdSql.close()
@ -115,11 +121,12 @@ class TDTestCase:
return cur return cur
def new_create_tables(self,dbname,vgroups,stbname,tcountStart,tcountStop): def new_create_tables(self,dbname,vgroups,stbname,tcountStart,tcountStop):
host = "chenhaoran02" host = "localhost"
buildPath = self.getBuildPath() buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/" config = buildPath+ "../sim/dnode1/cfg/"
tsql=self.newcur(host,config) tsql=self.newcur(host,config)
tsql.execute("drop database if exists %s"%dbname)
tsql.execute("create database %s vgroups %d"%(dbname,vgroups)) tsql.execute("create database %s vgroups %d"%(dbname,vgroups))
tsql.execute("use %s" %dbname) tsql.execute("use %s" %dbname)
tsql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname) tsql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname)
@ -182,7 +189,52 @@ class TDTestCase:
tdLog.debug("INSERT TABLE DATA ............ [OK]") tdLog.debug("INSERT TABLE DATA ............ [OK]")
return return
def taosBench(self,jsonFile):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
taosBenchbin = buildPath+ "/build/bin/taosBenchmark"
os.system("%s -f %s -y " %(taosBenchbin,jsonFile))
return
def taosBenchCreate(self,dbname,stbname,vgroups,threadNumbers,count):
# count=50000
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
taosBenchbin = buildPath+ "/build/bin/taosBenchmark"
# insert: create one or mutiple tables per sql and insert multiple rows per sql
tdSql.execute("drop database if exists %s"%dbname)
tdSql.execute("create database %s vgroups %d"%(dbname,vgroups))
tdSql.execute("use %s" %dbname)
threads = []
# threadNumbers=2
for i in range(threadNumbers):
jsonfile="1-insert/Vgroups%d%d.json"%(vgroups,i)
os.system("cp -f 1-insert/manyVgroups.json %s"%(jsonfile))
os.system("sed -i 's/\"name\": \"db\",/\"name\": \"%s%d\",/g' %s"%(dbname,i,jsonfile))
os.system("sed -i 's/\"childtable_count\": 300000,/\"childtable_count\": %d,/g' %s "%(count,jsonfile))
os.system("sed -i 's/\"name\": \"stb1\",/\"name\": \"%s%d\",/g' %s "%(stbname,i,jsonfile))
os.system("sed -i 's/\"childtable_prefix\": \"stb1_\",/\"childtable_prefix\": \"%s%d_\",/g' %s "%(stbname,i,jsonfile))
threads.append(mp.Process(target=self.taosBench, args=("%s"%jsonfile,)))
start_time = time.time()
for tr in threads:
tr.start()
for tr in threads:
tr.join()
end_time = time.time()
spendTime=end_time-start_time
speedCreate=count/spendTime
tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
return
# test case1 base # test case1 base
def test_case1(self): def test_case1(self):
tdLog.debug("-----create database and tables test------- ") tdLog.debug("-----create database and tables test------- ")
@ -284,6 +336,12 @@ class TDTestCase:
return return
def test_case3(self):
self.taosBenchCreate("db1", "stb1", 1, 2, 1*50000)
return
# #
# add case with filename # add case with filename
# #

View File

@ -0,0 +1,76 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos/",
"host": "test216",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 8,
"thread_count_create_tbl": 8,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"interlace_rows": 100000,
"num_of_records_per_req": 100000,
"databases": [
{
"dbinfo": {
"name": "db",
"drop": "yes",
"vgroups": 1
},
"super_tables": [
{
"name": "stb1",
"child_table_exists": "no",
"childtable_count": 300000,
"childtable_prefix": "stb1_",
"auto_create_table": "no",
"batch_create_tbl_num": 50000,
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 0,
"interlace_rows": 0,
"insert_interval": 0,
"max_sql_len": 10000000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 10,
"sample_format": "csv",
"use_sample_ts": "no",
"tags_file": "",
"columns": [
{
"type": "INT"
},
{
"type": "DOUBLE",
"count": 100
},
{
"type": "BINARY",
"len": 400,
"count": 10
},
{
"type": "nchar",
"len": 200,
"count": 20
}
],
"tags": [
{
"type": "TINYINT",
"count": 2
},
{
"type": "BINARY",
"len": 16,
"count": 2
}
]
}
]
}
]
}

View File

@ -0,0 +1,299 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
selfPath = os.path.dirname(os.path.realpath(__file__))
utilPath="%s/../../pytest/"%selfPath
import threading
import multiprocessing as mp
from numpy.lib.function_base import insert
import taos
sys.path.append(utilPath)
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
import datetime as dt
import time
# constant define
WAITS = 5 # wait seconds
class TDTestCase:
#
# --------------- main frame -------------------
#
def caseDescription(self):
'''
limit and offset keyword function test cases;
case1: limit offset base function test
case2: offset return valid
'''
return
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
# init
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
# tdSql.init(conn.cursor())
# tdSql.prepare()
# self.create_tables();
self.ts = 1500000000000
# run case
def run(self):
# test base case
self.test_case1()
tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test advance case
# self.test_case2()
# tdLog.debug(" LIMIT test_case2 ............ [OK]")
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
# --------------- case -------------------
# create tables
def create_tables(self,dbname,stbname,count):
tdSql.execute("use %s" %dbname)
tdSql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname)
pre_create = "create table"
sql = pre_create
tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
# print(time.time())
exeStartTime=time.time()
for i in range(count):
sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1)
if i >0 and i%3000 == 0:
tdSql.execute(sql)
sql = pre_create
# print(time.time())
# end sql
if sql != pre_create:
tdSql.execute(sql)
exeEndTime=time.time()
spendTime=exeEndTime-exeStartTime
speedCreate=count/spendTime
tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
return
def newcur(self,host,cfg):
user = "root"
password = "taosdata"
port =6030
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
cur=con.cursor()
print(cur)
return cur
def new_create_tables(self,dbname,vgroups,stbname,tcountStart,tcountStop):
host = "127.0.0.1"
buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/"
tsql=self.newcur(host,config)
tsql.execute("drop database if exists %s" %(dbname))
tsql.execute("create database if not exists %s vgroups %d"%(dbname,vgroups))
tsql.execute("use %s" %dbname)
tsql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname)
pre_create = "create table"
sql = pre_create
tcountStop=int(tcountStop)
tcountStart=int(tcountStart)
count=tcountStop-tcountStart
tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
# print(time.time())
exeStartTime=time.time()
# print(type(tcountStop),type(tcountStart))
for i in range(tcountStart,tcountStop):
sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1)
if i >0 and i%20000 == 0:
# print(sql)
tsql.execute(sql)
sql = pre_create
# print(time.time())
# end sql
if sql != pre_create:
# print(sql)
tsql.execute(sql)
exeEndTime=time.time()
spendTime=exeEndTime-exeStartTime
speedCreate=count/spendTime
# tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
return
# insert data
def insert_data(self, dbname, stbname, ts_start, tcountStart,tcountStop,rowCount):
tdSql.execute("use %s" %dbname)
pre_insert = "insert into "
sql = pre_insert
tcount=tcountStop-tcountStart
allRows=tcount*rowCount
tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbname, allRows))
exeStartTime=time.time()
for i in range(tcountStart,tcountStop):
sql += " %s_%d values "%(stbname,i)
for j in range(rowCount):
sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j)
if j >0 and j%5000 == 0:
# print(sql)
tdSql.execute(sql)
sql = "insert into %s_%d values " %(stbname,i)
# end sql
if sql != pre_insert:
# print(sql)
tdSql.execute(sql)
exeEndTime=time.time()
spendTime=exeEndTime-exeStartTime
speedInsert=allRows/spendTime
# tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert))
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
# test case1 base
def test_case1(self):
tdLog.debug("-----create database and tables test------- ")
# tdSql.execute("drop database if exists db1")
# tdSql.execute("drop database if exists db4")
# tdSql.execute("drop database if exists db6")
# tdSql.execute("drop database if exists db8")
# tdSql.execute("drop database if exists db12")
# tdSql.execute("drop database if exists db16")
#create database and tables;
# tdSql.execute("create database db11 vgroups 1")
# # self.create_tables("db1", "stb1", 30*10000)
# tdSql.execute("use db1")
# tdSql.execute("create stable stb1(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)")
# tdSql.execute("create database db12 vgroups 1")
# # self.create_tables("db1", "stb1", 30*10000)
# tdSql.execute("use db1")
# t1 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(1,))
# t2 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(2,))
# t1 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", 0,count/2,))
# t2 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", count/2,count,))
count=500000
vgroups=1
threads = []
threadNumbers=2
for i in range(threadNumbers):
threads.append(mp.Process(target=self.new_create_tables, args=("db1%d"%i, vgroups, "stb1", 0,count,)))
start_time = time.time()
for tr in threads:
tr.start()
for tr in threads:
tr.join()
end_time = time.time()
spendTime=end_time-start_time
speedCreate=count/spendTime
tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
# self.new_create_tables("db1", "stb1", 15*10000)
# self.new_create_tables("db1", "stb1", 15*10000)
# tdSql.execute("create database db4 vgroups 4")
# self.create_tables("db4", "stb4", 30*10000)
# tdSql.execute("create database db6 vgroups 6")
# self.create_tables("db6", "stb6", 30*10000)
# tdSql.execute("create database db8 vgroups 8")
# self.create_tables("db8", "stb8", 30*10000)
# tdSql.execute("create database db12 vgroups 12")
# self.create_tables("db12", "stb12", 30*10000)
# tdSql.execute("create database db16 vgroups 16")
# self.create_tables("db16", "stb16", 30*10000)
return
# test case2 base:insert data
def test_case2(self):
tdLog.debug("-----insert data test------- ")
# drop database
tdSql.execute("drop database if exists db1")
tdSql.execute("drop database if exists db4")
tdSql.execute("drop database if exists db6")
tdSql.execute("drop database if exists db8")
tdSql.execute("drop database if exists db12")
tdSql.execute("drop database if exists db16")
#create database and tables;
tdSql.execute("create database db1 vgroups 1")
self.create_tables("db1", "stb1", 1*100)
self.insert_data("db1", "stb1", self.ts, 1*50,1*10000)
tdSql.execute("create database db4 vgroups 4")
self.create_tables("db4", "stb4", 1*100)
self.insert_data("db4", "stb4", self.ts, 1*100,1*10000)
tdSql.execute("create database db6 vgroups 6")
self.create_tables("db6", "stb6", 1*100)
self.insert_data("db6", "stb6", self.ts, 1*100,1*10000)
tdSql.execute("create database db8 vgroups 8")
self.create_tables("db8", "stb8", 1*100)
self.insert_data("db8", "stb8", self.ts, 1*100,1*10000)
tdSql.execute("create database db12 vgroups 12")
self.create_tables("db12", "stb12", 1*100)
self.insert_data("db12", "stb12", self.ts, 1*100,1*10000)
tdSql.execute("create database db16 vgroups 16")
self.create_tables("db16", "stb16", 1*100)
self.insert_data("db16", "stb16", self.ts, 1*100,1*10000)
return
#
# add case with filename
#
# tdCases.addWindows(__file__, TDTestCase())
# tdCases.addLinux(__file__, TDTestCase())
case=TDTestCase()
case.test_case1()

View File

@ -6,7 +6,7 @@ python3 ./test.py -f 0-others/taosShell.py
python3 ./test.py -f 0-others/taosShellError.py python3 ./test.py -f 0-others/taosShellError.py
python3 ./test.py -f 0-others/taosShellNetChk.py python3 ./test.py -f 0-others/taosShellNetChk.py
python3 ./test.py -f 0-others/telemetry.py python3 ./test.py -f 0-others/telemetry.py
python3 ./test.py -f 0-others/taosdMonitor.py
#python3 ./test.py -f 2-query/between.py #python3 ./test.py -f 2-query/between.py
python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/distinct.py
@ -32,8 +32,7 @@ python3 ./test.py -f 2-query/last.py
python3 ./test.py -f 2-query/To_unixtimestamp.py python3 ./test.py -f 2-query/To_unixtimestamp.py
python3 ./test.py -f 2-query/timetruncate.py python3 ./test.py -f 2-query/timetruncate.py
python3 ./test.py -f 2-query/Timediff.py # python3 ./test.py -f 2-query/Timediff.py
# python3 ./test.py -f 2-query/diff.py
#python3 ./test.py -f 2-query/cast.py #python3 ./test.py -f 2-query/cast.py