Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression
This commit is contained in:
commit
2eca067609
|
@ -106,6 +106,8 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg);
|
|||
*/
|
||||
void mndGenerateMachineCode();
|
||||
|
||||
void mndDumpSdb();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -151,13 +151,14 @@ typedef struct {
|
|||
typedef struct {
|
||||
SRequestObj *request;
|
||||
tsem_t sem;
|
||||
int32_t cnt;
|
||||
int32_t total;
|
||||
TdThreadSpinlock lock;
|
||||
} Params;
|
||||
|
||||
typedef struct {
|
||||
int64_t id;
|
||||
Params *params;
|
||||
bool isLast;
|
||||
|
||||
SMLProtocolType protocol;
|
||||
int8_t precision;
|
||||
|
@ -2449,28 +2450,26 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
|
|||
int32_t rows = taos_affected_rows(pRequest);
|
||||
|
||||
uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
|
||||
// lock
|
||||
taosThreadSpinLock(&info->params->lock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
info->params->request->code = code;
|
||||
info->params->request->body.resInfo.numOfRows += rows;
|
||||
}else{
|
||||
info->params->request->body.resInfo.numOfRows += info->affectedRows;
|
||||
}
|
||||
taosThreadSpinUnlock(&info->params->lock);
|
||||
// unlock
|
||||
|
||||
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
|
||||
Params *pParam = info->params;
|
||||
bool isLast = info->isLast;
|
||||
// lock
|
||||
taosThreadSpinLock(&pParam->lock);
|
||||
pParam->cnt++;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pParam->request->code = code;
|
||||
pParam->request->body.resInfo.numOfRows += rows;
|
||||
}else{
|
||||
pParam->request->body.resInfo.numOfRows += info->affectedRows;
|
||||
}
|
||||
if (pParam->cnt == pParam->total) {
|
||||
tsem_post(&pParam->sem);
|
||||
}
|
||||
taosThreadSpinUnlock(&pParam->lock);
|
||||
// unlock
|
||||
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
|
||||
info->cost.endTime = taosGetTimestampUs();
|
||||
info->cost.code = code;
|
||||
smlPrintStatisticInfo(info);
|
||||
smlDestroyInfo(info);
|
||||
|
||||
if (isLast) {
|
||||
tsem_post(&pParam->sem);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2512,7 +2511,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
|||
pTscObj->schemalessType = 1;
|
||||
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||
|
||||
Params params;
|
||||
Params params = {0};
|
||||
params.request = request;
|
||||
tsem_init(¶ms.sem, 0, 0);
|
||||
taosThreadSpinInit(&(params.lock), 0);
|
||||
|
@ -2557,6 +2556,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
|||
}
|
||||
|
||||
batchs = ceil(((double)numLines) / LINE_BATCH);
|
||||
params.total = batchs;
|
||||
for (int i = 0; i < batchs; ++i) {
|
||||
SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
|
||||
if(!req){
|
||||
|
@ -2575,11 +2575,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
|
|||
|
||||
if (numLines > perBatch) {
|
||||
numLines -= perBatch;
|
||||
info->isLast = false;
|
||||
} else {
|
||||
perBatch = numLines;
|
||||
numLines = 0;
|
||||
info->isLast = true;
|
||||
}
|
||||
|
||||
info->params = ¶ms;
|
||||
|
|
|
@ -16,10 +16,12 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "dmMgmt.h"
|
||||
#include "tconfig.h"
|
||||
#include "mnode.h"
|
||||
|
||||
#define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'."
|
||||
#define DM_CFG_DIR "Configuration directory."
|
||||
#define DM_DMP_CFG "Dump configuration."
|
||||
#define DM_SDB_INFO "Dump sdb info."
|
||||
#define DM_ENV_CMD "The env cmd variable string to use when configuring the server, such as: -e 'TAOS_FQDN=td1'."
|
||||
#define DM_ENV_FILE "The env variable file path to use when configuring the server, default is './.env', .env text can be 'TAOS_FQDN=td1'."
|
||||
#define DM_NODE_TYPE "Startup type of the node, default is 0."
|
||||
|
@ -31,6 +33,7 @@ static struct {
|
|||
bool winServiceMode;
|
||||
#endif
|
||||
bool dumpConfig;
|
||||
bool dumpSdb;
|
||||
bool generateGrant;
|
||||
bool printAuth;
|
||||
bool printVersion;
|
||||
|
@ -82,6 +85,8 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
|
|||
}
|
||||
} else if (strcmp(argv[i], "-a") == 0) {
|
||||
tstrncpy(global.apolloUrl, argv[++i], PATH_MAX);
|
||||
} else if (strcmp(argv[i], "-s") == 0) {
|
||||
global.dumpSdb = true;
|
||||
} else if (strcmp(argv[i], "-E") == 0) {
|
||||
tstrncpy(global.envFile, argv[++i], PATH_MAX);
|
||||
} else if (strcmp(argv[i], "-n") == 0) {
|
||||
|
@ -131,6 +136,7 @@ static void dmPrintHelp() {
|
|||
printf("Usage: taosd [OPTION...] \n\n");
|
||||
printf("%s%s%s%s\n", indent, "-a,", indent, DM_APOLLO_URL);
|
||||
printf("%s%s%s%s\n", indent, "-c,", indent, DM_CFG_DIR);
|
||||
printf("%s%s%s%s\n", indent, "-s,", indent, DM_SDB_INFO);
|
||||
printf("%s%s%s%s\n", indent, "-C,", indent, DM_DMP_CFG);
|
||||
printf("%s%s%s%s\n", indent, "-e,", indent, DM_ENV_CMD);
|
||||
printf("%s%s%s%s\n", indent, "-E,", indent, DM_ENV_FILE);
|
||||
|
@ -229,6 +235,14 @@ int mainWindows(int argc,char** argv) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (global.dumpSdb) {
|
||||
mndDumpSdb();
|
||||
taosCleanupCfg();
|
||||
taosCloseLog();
|
||||
taosCleanupArgs();
|
||||
return 0;
|
||||
}
|
||||
|
||||
dmSetProcInfo(argc, (char **)argv);
|
||||
taosCleanupArgs();
|
||||
|
||||
|
|
|
@ -40,6 +40,8 @@ int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, vo
|
|||
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
||||
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
||||
|
||||
const char *mndGetStbStr(const char *src);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -54,13 +54,15 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
|
|||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
||||
|
||||
int32_t mndInitConsumer(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_CONSUMER,
|
||||
.keyType = SDB_KEY_INT64,
|
||||
.encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndConsumerActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
|
||||
SSdbTable table = {
|
||||
.sdbType = SDB_CONSUMER,
|
||||
.keyType = SDB_KEY_INT64,
|
||||
.encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndConsumerActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete,
|
||||
};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_HB, mndProcessMqHbReq);
|
||||
|
|
|
@ -0,0 +1,645 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndDb.h"
|
||||
#include "mndInt.h"
|
||||
#include "mndStb.h"
|
||||
#include "sdb.h"
|
||||
#include "tconfig.h"
|
||||
#include "tjson.h"
|
||||
#include "ttypes.h"
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-result"
|
||||
|
||||
void reportStartup(const char *name, const char *desc) {}
|
||||
void sendRsp(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
|
||||
|
||||
int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
char *i642str(int64_t val) {
|
||||
static char str[24] = {0};
|
||||
snprintf(str, sizeof(str), "%" PRId64, val);
|
||||
return str;
|
||||
}
|
||||
|
||||
void dumpFunc(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "funcs");
|
||||
|
||||
while (1) {
|
||||
SFuncObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_FUNC, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "name", pObj->name);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "funcType", i642str(pObj->funcType));
|
||||
tjsonAddStringToObject(item, "scriptType", i642str(pObj->scriptType));
|
||||
tjsonAddStringToObject(item, "align", i642str(pObj->align));
|
||||
tjsonAddStringToObject(item, "outputType", i642str(pObj->outputType));
|
||||
tjsonAddStringToObject(item, "outputLen", i642str(pObj->outputLen));
|
||||
tjsonAddStringToObject(item, "bufSize", i642str(pObj->bufSize));
|
||||
tjsonAddStringToObject(item, "signature", i642str(pObj->signature));
|
||||
tjsonAddStringToObject(item, "commentSize", i642str(pObj->commentSize));
|
||||
tjsonAddStringToObject(item, "codeSize", i642str(pObj->codeSize));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpDb(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "dbs", items);
|
||||
|
||||
while (1) {
|
||||
SDbObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToObject(items, "db", item);
|
||||
|
||||
tjsonAddStringToObject(item, "name", mndGetDbStr(pObj->name));
|
||||
tjsonAddStringToObject(item, "acct", pObj->acct);
|
||||
tjsonAddStringToObject(item, "createUser", pObj->createUser);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||
tjsonAddStringToObject(item, "cfgVersion", i642str(pObj->cfgVersion));
|
||||
tjsonAddStringToObject(item, "vgVersion", i642str(pObj->vgVersion));
|
||||
tjsonAddStringToObject(item, "numOfVgroups", i642str(pObj->cfg.numOfVgroups));
|
||||
tjsonAddStringToObject(item, "numOfStables", i642str(pObj->cfg.numOfStables));
|
||||
tjsonAddStringToObject(item, "buffer", i642str(pObj->cfg.buffer));
|
||||
tjsonAddStringToObject(item, "pageSize", i642str(pObj->cfg.pageSize));
|
||||
tjsonAddStringToObject(item, "pages", i642str(pObj->cfg.pages));
|
||||
tjsonAddStringToObject(item, "cacheLastSize", i642str(pObj->cfg.cacheLastSize));
|
||||
tjsonAddStringToObject(item, "daysPerFile", i642str(pObj->cfg.daysPerFile));
|
||||
tjsonAddStringToObject(item, "daysToKeep0", i642str(pObj->cfg.daysToKeep0));
|
||||
tjsonAddStringToObject(item, "daysToKeep1", i642str(pObj->cfg.daysToKeep1));
|
||||
tjsonAddStringToObject(item, "daysToKeep2", i642str(pObj->cfg.daysToKeep2));
|
||||
tjsonAddStringToObject(item, "minRows", i642str(pObj->cfg.minRows));
|
||||
tjsonAddStringToObject(item, "maxRows", i642str(pObj->cfg.maxRows));
|
||||
tjsonAddStringToObject(item, "precision", i642str(pObj->cfg.precision));
|
||||
tjsonAddStringToObject(item, "compression", i642str(pObj->cfg.compression));
|
||||
tjsonAddStringToObject(item, "replications", i642str(pObj->cfg.replications));
|
||||
tjsonAddStringToObject(item, "strict", i642str(pObj->cfg.strict));
|
||||
tjsonAddStringToObject(item, "cacheLast",i642str( pObj->cfg.cacheLast));
|
||||
tjsonAddStringToObject(item, "hashMethod", i642str(pObj->cfg.hashMethod));
|
||||
tjsonAddStringToObject(item, "hashPrefix", i642str(pObj->cfg.hashPrefix));
|
||||
tjsonAddStringToObject(item, "hashSuffix", i642str(pObj->cfg.hashSuffix));
|
||||
tjsonAddStringToObject(item, "sstTrigger", i642str(pObj->cfg.sstTrigger));
|
||||
tjsonAddStringToObject(item, "tsdbPageSize",i642str( pObj->cfg.tsdbPageSize));
|
||||
tjsonAddStringToObject(item, "schemaless", i642str(pObj->cfg.schemaless));
|
||||
tjsonAddStringToObject(item, "walLevel",i642str( pObj->cfg.walLevel));
|
||||
tjsonAddStringToObject(item, "walFsyncPeriod", i642str(pObj->cfg.walFsyncPeriod));
|
||||
tjsonAddStringToObject(item, "walRetentionPeriod", i642str(pObj->cfg.walRetentionPeriod));
|
||||
tjsonAddStringToObject(item, "walRetentionSize",i642str( pObj->cfg.walRetentionSize));
|
||||
tjsonAddStringToObject(item, "walRollPeriod", i642str(pObj->cfg.walRollPeriod));
|
||||
tjsonAddStringToObject(item, "walSegmentSize", i642str(pObj->cfg.walSegmentSize));
|
||||
|
||||
tjsonAddStringToObject(item, "numOfRetensions",i642str( pObj->cfg.numOfRetensions));
|
||||
for (int32_t i = 0; i < pObj->cfg.numOfRetensions; ++i) {
|
||||
SJson *rentensions = tjsonAddArrayToObject(item, "rentensions");
|
||||
SJson *rentension = tjsonCreateObject();
|
||||
tjsonAddItemToArray(rentensions, rentension);
|
||||
|
||||
SRetention *pRetension = taosArrayGet(pObj->cfg.pRetensions, i);
|
||||
tjsonAddStringToObject(item, "freq", i642str(pRetension->freq));
|
||||
tjsonAddStringToObject(item, "freqUnit", i642str(pRetension->freqUnit));
|
||||
tjsonAddStringToObject(item, "keep", i642str(pRetension->keep));
|
||||
tjsonAddStringToObject(item, "keepUnit",i642str( pRetension->keepUnit));
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpStb(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "stbs");
|
||||
|
||||
while (1) {
|
||||
SStbObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "name", mndGetStbStr(pObj->name));
|
||||
tjsonAddStringToObject(item, "db", mndGetDbStr(pObj->db));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||
tjsonAddStringToObject(item, "tagVer",i642str( pObj->tagVer));
|
||||
tjsonAddStringToObject(item, "colVer", i642str(pObj->colVer));
|
||||
tjsonAddStringToObject(item, "smaVer", i642str(pObj->smaVer));
|
||||
tjsonAddStringToObject(item, "nextColId", i642str(pObj->nextColId));
|
||||
tjsonAddStringToObject(item, "watermark1", i642str(pObj->watermark[0]));
|
||||
tjsonAddStringToObject(item, "watermark2", i642str(pObj->watermark[1]));
|
||||
tjsonAddStringToObject(item, "maxdelay0",i642str( pObj->maxdelay[0]));
|
||||
tjsonAddStringToObject(item, "maxdelay1",i642str( pObj->maxdelay[1]));
|
||||
tjsonAddStringToObject(item, "ttl",i642str( pObj->ttl));
|
||||
tjsonAddStringToObject(item, "numOfFuncs",i642str( pObj->numOfFuncs));
|
||||
tjsonAddStringToObject(item, "commentLen", i642str(pObj->commentLen));
|
||||
tjsonAddStringToObject(item, "ast1Len", i642str(pObj->ast1Len));
|
||||
tjsonAddStringToObject(item, "ast2Len",i642str( pObj->ast2Len));
|
||||
|
||||
tjsonAddStringToObject(item, "numOfColumns",i642str( pObj->numOfColumns));
|
||||
SJson *columns = tjsonAddArrayToObject(item, "columns");
|
||||
for (int32_t i = 0; i < pObj->numOfColumns; ++i) {
|
||||
SJson *column = tjsonCreateObject();
|
||||
tjsonAddItemToArray(columns, column);
|
||||
|
||||
SSchema *pColumn = &pObj->pColumns[i];
|
||||
tjsonAddStringToObject(column, "type", i642str(pColumn->type));
|
||||
tjsonAddStringToObject(column, "typestr", tDataTypes[pColumn->type].name);
|
||||
tjsonAddStringToObject(column, "flags", i642str(pColumn->flags));
|
||||
tjsonAddStringToObject(column, "colId", i642str(pColumn->colId));
|
||||
tjsonAddStringToObject(column, "bytes", i642str(pColumn->bytes));
|
||||
tjsonAddStringToObject(column, "name", pColumn->name);
|
||||
}
|
||||
|
||||
tjsonAddStringToObject(item, "numOfTags", i642str(pObj->numOfTags));
|
||||
SJson *tags = tjsonAddArrayToObject(item, "tags");
|
||||
for (int32_t i = 0; i < pObj->numOfTags; ++i) {
|
||||
SJson *tag = tjsonCreateObject();
|
||||
tjsonAddItemToArray(tags, tag);
|
||||
|
||||
SSchema *pTag = &pObj->pTags[i];
|
||||
tjsonAddStringToObject(tag, "type", i642str(pTag->type));
|
||||
tjsonAddStringToObject(tag, "typestr", tDataTypes[pTag->type].name);
|
||||
tjsonAddStringToObject(tag, "flags",i642str( pTag->flags));
|
||||
tjsonAddStringToObject(tag, "colId", i642str(pTag->colId));
|
||||
tjsonAddStringToObject(tag, "bytes", i642str(pTag->bytes));
|
||||
tjsonAddStringToObject(tag, "name", pTag->name);
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpSma(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "smas");
|
||||
|
||||
while (1) {
|
||||
SSmaObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "name", mndGetStbStr(pObj->name));
|
||||
tjsonAddStringToObject(item, "stb", mndGetStbStr(pObj->stb));
|
||||
tjsonAddStringToObject(item, "db", mndGetDbStr(pObj->db));
|
||||
tjsonAddStringToObject(item, "dstTbName", mndGetStbStr(pObj->dstTbName));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||
tjsonAddStringToObject(item, "stbUid", i642str(pObj->stbUid));
|
||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||
tjsonAddStringToObject(item, "dstTbUid", i642str(pObj->dstTbUid));
|
||||
tjsonAddStringToObject(item, "intervalUnit", i642str(pObj->intervalUnit));
|
||||
tjsonAddStringToObject(item, "slidingUnit",i642str( pObj->slidingUnit));
|
||||
tjsonAddStringToObject(item, "timezone", i642str(pObj->timezone));
|
||||
tjsonAddStringToObject(item, "dstVgId",i642str( pObj->dstVgId));
|
||||
tjsonAddStringToObject(item, "interval", i642str(pObj->interval));
|
||||
tjsonAddStringToObject(item, "offset", i642str(pObj->offset));
|
||||
tjsonAddStringToObject(item, "sliding", i642str(pObj->sliding));
|
||||
tjsonAddStringToObject(item, "exprLen",i642str( pObj->exprLen));
|
||||
tjsonAddStringToObject(item, "tagsFilterLen", i642str(pObj->tagsFilterLen));
|
||||
tjsonAddStringToObject(item, "sqlLen",i642str( pObj->sqlLen));
|
||||
tjsonAddStringToObject(item, "astLen",i642str( pObj->astLen));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpVgroup(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "vgroups");
|
||||
|
||||
while (1) {
|
||||
SVgObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "vgId", i642str(pObj->vgId));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "version",i642str(pObj->version));
|
||||
tjsonAddStringToObject(item, "hashBegin", i642str(pObj->hashBegin));
|
||||
tjsonAddStringToObject(item, "hashEnd", i642str(pObj->hashEnd));
|
||||
tjsonAddStringToObject(item, "db", mndGetDbStr(pObj->dbName));
|
||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||
tjsonAddStringToObject(item, "isTsma", i642str(pObj->isTsma));
|
||||
tjsonAddStringToObject(item, "replica",i642str( pObj->replica));
|
||||
for (int32_t i = 0; i < pObj->replica; ++i) {
|
||||
SJson *replicas = tjsonAddArrayToObject(item, "replicas");
|
||||
SJson *replica = tjsonCreateObject();
|
||||
tjsonAddItemToArray(replicas, replica);
|
||||
tjsonAddStringToObject(replica, "dnodeId", i642str(pObj->vnodeGid[i].dnodeId));
|
||||
}
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpTopic(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "topics");
|
||||
|
||||
while (1) {
|
||||
SMqTopicObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "name", mndGetDbStr(pObj->name));
|
||||
tjsonAddStringToObject(item, "name", mndGetDbStr(pObj->db));
|
||||
tjsonAddStringToObject(item, "createTime", i642str(pObj->createTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||
tjsonAddStringToObject(item, "version",i642str( pObj->version));
|
||||
tjsonAddStringToObject(item, "subType",i642str( pObj->subType));
|
||||
tjsonAddStringToObject(item, "withMeta", i642str(pObj->withMeta));
|
||||
tjsonAddStringToObject(item, "stbUid", i642str(pObj->stbUid));
|
||||
tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen));
|
||||
tjsonAddStringToObject(item, "astLen",i642str( pObj->astLen));
|
||||
tjsonAddStringToObject(item, "sqlLen",i642str( pObj->sqlLen));
|
||||
tjsonAddStringToObject(item, "ntbUid", i642str(pObj->ntbUid));
|
||||
tjsonAddStringToObject(item, "ctbStbUid", i642str(pObj->ctbStbUid));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpConsumer(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "consumers");
|
||||
|
||||
while (1) {
|
||||
SMqConsumerObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "consumerId", i642str(pObj->consumerId));
|
||||
tjsonAddStringToObject(item, "cgroup", pObj->cgroup);
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpSubscribe(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "subscribes");
|
||||
|
||||
while (1) {
|
||||
SMqSubscribeObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "key", pObj->key);
|
||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||
tjsonAddStringToObject(item, "stbUid", i642str(pObj->stbUid));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpOffset(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "offsets");
|
||||
|
||||
while (1) {
|
||||
SMqOffsetObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "key", pObj->key);
|
||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||
tjsonAddStringToObject(item, "offset", i642str(pObj->offset));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpStream(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "streams");
|
||||
|
||||
while (1) {
|
||||
SStreamObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "name", mndGetDbStr(pObj->name));
|
||||
tjsonAddStringToObject(item, "createTime", i642str(pObj->createTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "version", i642str(pObj->version));
|
||||
tjsonAddStringToObject(item, "totalLevel", i642str(pObj->totalLevel));
|
||||
tjsonAddStringToObject(item, "smaId", i642str(pObj->smaId));
|
||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||
tjsonAddStringToObject(item, "status",i642str( pObj->status));
|
||||
tjsonAddStringToObject(item, "igExpired",i642str( pObj->igExpired));
|
||||
tjsonAddStringToObject(item, "trigger",i642str( pObj->trigger));
|
||||
tjsonAddStringToObject(item, "triggerParam", i642str(pObj->triggerParam));
|
||||
tjsonAddStringToObject(item, "watermark", i642str(pObj->watermark));
|
||||
tjsonAddStringToObject(item, "sourceDbUid", i642str(pObj->sourceDbUid));
|
||||
tjsonAddStringToObject(item, "targetDbUid", i642str(pObj->targetDbUid));
|
||||
tjsonAddStringToObject(item, "sourceDb", mndGetDbStr(pObj->sourceDb));
|
||||
tjsonAddStringToObject(item, "targetDb", mndGetDbStr(pObj->targetDb));
|
||||
tjsonAddStringToObject(item, "targetSTbName", mndGetStbStr(pObj->targetSTbName));
|
||||
tjsonAddStringToObject(item, "targetStbUid", i642str(pObj->targetStbUid));
|
||||
tjsonAddStringToObject(item, "fixedSinkVgId", i642str(pObj->fixedSinkVgId));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpAcct(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "accts");
|
||||
|
||||
while (1) {
|
||||
SAcctObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_ACCT, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "acct", pObj->acct);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "acctId", i642str(pObj->acctId));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpAuth(SSdb *pSdb, SJson *json) {
|
||||
// todo
|
||||
}
|
||||
|
||||
void dumpUser(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "users");
|
||||
|
||||
while (1) {
|
||||
SUserObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "name", pObj->user);
|
||||
tjsonAddStringToObject(item, "acct", pObj->acct);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "superUser",i642str( pObj->superUser));
|
||||
tjsonAddStringToObject(item, "authVersion", i642str(pObj->authVersion));
|
||||
tjsonAddStringToObject(item, "numOfReadDbs",i642str( taosHashGetSize(pObj->readDbs)));
|
||||
tjsonAddStringToObject(item, "numOfWriteDbs", i642str(taosHashGetSize(pObj->writeDbs)));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpDnode(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "dnodes");
|
||||
|
||||
while (1) {
|
||||
SDnodeObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "id",i642str( pObj->id));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "port",i642str( pObj->port));
|
||||
tjsonAddStringToObject(item, "fqdn", pObj->fqdn);
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpBnode(SSdb *pSdb, SJson *json) {
|
||||
// not implemented yet
|
||||
}
|
||||
|
||||
void dumpSnode(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "snodes");
|
||||
|
||||
while (1) {
|
||||
SSnodeObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "id",i642str( pObj->id));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpQnode(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "qnodes");
|
||||
|
||||
while (1) {
|
||||
SQnodeObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "id", i642str(pObj->id));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpMnode(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "mnodes");
|
||||
|
||||
while (1) {
|
||||
SMnodeObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "id", i642str(pObj->id));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpCluster(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "clusters");
|
||||
|
||||
while (1) {
|
||||
SClusterObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "id", i642str(pObj->id));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "name", pObj->name);
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpTrans(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonAddArrayToObject(json, "transactions");
|
||||
|
||||
while (1) {
|
||||
STrans *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToArray(items, item);
|
||||
tjsonAddStringToObject(item, "id", i642str(pObj->id));
|
||||
tjsonAddStringToObject(item, "stage", i642str(pObj->stage));
|
||||
tjsonAddStringToObject(item, "policy", i642str(pObj->policy));
|
||||
tjsonAddStringToObject(item, "conflict",i642str( pObj->conflict));
|
||||
tjsonAddStringToObject(item, "exec", i642str(pObj->exec));
|
||||
tjsonAddStringToObject(item, "oper", i642str(pObj->oper));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "dbname", pObj->dbname);
|
||||
tjsonAddStringToObject(item, "stbname", pObj->stbname);
|
||||
tjsonAddStringToObject(item, "opername", pObj->opername);
|
||||
tjsonAddStringToObject(item, "commitLogNum",i642str( taosArrayGetSize(pObj->commitActions)));
|
||||
tjsonAddStringToObject(item, "redoActionNum",i642str(taosArrayGetSize(pObj->redoActions)));
|
||||
tjsonAddStringToObject(item, "undoActionNum", i642str(taosArrayGetSize(pObj->undoActions)));
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpHeader(SSdb *pSdb, SJson *json) {
|
||||
tjsonAddStringToObject(json, "sver", i642str(1));
|
||||
tjsonAddStringToObject(json, "applyIndex", i642str(pSdb->applyIndex));
|
||||
tjsonAddStringToObject(json, "applyTerm", i642str(pSdb->applyTerm));
|
||||
tjsonAddStringToObject(json, "applyConfig", i642str(pSdb->applyConfig));
|
||||
|
||||
SJson *maxIdsJson = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "maxIds", maxIdsJson);
|
||||
for (int32_t i = 0; i < SDB_MAX; ++i) {
|
||||
int64_t maxId = 0;
|
||||
if (i < SDB_MAX) {
|
||||
maxId = pSdb->maxId[i];
|
||||
}
|
||||
tjsonAddStringToObject(maxIdsJson, sdbTableName(i), i642str(maxId));
|
||||
}
|
||||
|
||||
SJson *tableVersJson = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "tableVers", tableVersJson);
|
||||
for (int32_t i = 0; i < SDB_MAX; ++i) {
|
||||
int64_t tableVer = 0;
|
||||
if (i < SDB_MAX) {
|
||||
tableVer = pSdb->tableVer[i];
|
||||
}
|
||||
tjsonAddStringToObject(tableVersJson, sdbTableName(i), i642str(tableVer));
|
||||
}
|
||||
}
|
||||
|
||||
void mndDumpSdb() {
|
||||
mInfo("start to dump sdb info to sdb.json");
|
||||
|
||||
char path[PATH_MAX * 2] = {0};
|
||||
snprintf(path, sizeof(path), "%s%smnode", tsDataDir, TD_DIRSEP);
|
||||
|
||||
SMsgCb msgCb = {0};
|
||||
msgCb.reportStartupFp = reportStartup;
|
||||
msgCb.sendReqFp = sendReq;
|
||||
msgCb.sendRspFp = sendRsp;
|
||||
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
||||
tmsgSetDefault(&msgCb);
|
||||
|
||||
walInit();
|
||||
syncInit();
|
||||
|
||||
SMnodeOpt opt = {.msgCb = msgCb};
|
||||
SMnode *pMnode = mndOpen(path, &opt);
|
||||
if (pMnode == NULL) return;
|
||||
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SJson *json = tjsonCreateObject();
|
||||
dumpHeader(pSdb, json);
|
||||
dumpFunc(pSdb, json);
|
||||
dumpDb(pSdb, json);
|
||||
dumpStb(pSdb, json);
|
||||
dumpSma(pSdb, json);
|
||||
dumpVgroup(pSdb, json);
|
||||
dumpTopic(pSdb, json);
|
||||
dumpConsumer(pSdb, json);
|
||||
dumpSubscribe(pSdb, json);
|
||||
dumpOffset(pSdb, json);
|
||||
dumpStream(pSdb, json);
|
||||
dumpAcct(pSdb, json);
|
||||
dumpAuth(pSdb, json);
|
||||
dumpUser(pSdb, json);
|
||||
dumpDnode(pSdb, json);
|
||||
dumpBnode(pSdb, json);
|
||||
dumpSnode(pSdb, json);
|
||||
dumpQnode(pSdb, json);
|
||||
dumpMnode(pSdb, json);
|
||||
dumpCluster(pSdb, json);
|
||||
dumpTrans(pSdb, json);
|
||||
|
||||
char *pCont = tjsonToString(json);
|
||||
int32_t contLen = strlen(pCont);
|
||||
char file[] = "sdb.json";
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to write %s since %s", file, terrstr());
|
||||
return;
|
||||
}
|
||||
taosWriteFile(pFile, pCont, contLen);
|
||||
taosWriteFile(pFile, "\n", 1);
|
||||
taosFsyncFile(pFile);
|
||||
taosCloseFile(&pFile);
|
||||
tjsonDelete(json);
|
||||
taosMemoryFree(pCont);
|
||||
|
||||
mInfo("dump sdb info success");
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
|
@ -38,13 +38,15 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter);
|
||||
|
||||
int32_t mndInitFunc(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_FUNC,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.encodeFp = (SdbEncodeFp)mndFuncActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndFuncActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndFuncActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndFuncActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndFuncActionDelete};
|
||||
SSdbTable table = {
|
||||
.sdbType = SDB_FUNC,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.encodeFp = (SdbEncodeFp)mndFuncActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndFuncActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndFuncActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndFuncActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndFuncActionDelete,
|
||||
};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNC, mndProcessCreateFuncReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
|
||||
|
|
|
@ -2579,3 +2579,14 @@ static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
const char *mndGetStbStr(const char *src) {
|
||||
char *posDb = strstr(src, TS_PATH_DELIMITER);
|
||||
if (posDb != NULL) ++posDb;
|
||||
if (posDb == NULL) return src;
|
||||
|
||||
char *posStb = strstr(posDb, TS_PATH_DELIMITER);
|
||||
if (posStb != NULL) ++posStb;
|
||||
if (posStb == NULL) return posDb;
|
||||
return posStb;
|
||||
}
|
|
@ -40,6 +40,8 @@ const char *sdbTableName(ESdbType type) {
|
|||
return "auth";
|
||||
case SDB_ACCT:
|
||||
return "acct";
|
||||
case SDB_STREAM_CK:
|
||||
return "stream_ck";
|
||||
case SDB_STREAM:
|
||||
return "stream";
|
||||
case SDB_OFFSET:
|
||||
|
|
|
@ -2977,10 +2977,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) {
|
||||
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, int32_t rowIndex) {
|
||||
SInputColumnInfoData* pColInfo = &pCtx->input;
|
||||
|
||||
int32_t start = pColInfo->startRowIndex;
|
||||
if (pOutput->hasResult) {
|
||||
if (isFirst) {
|
||||
if (pInput->ts > pOutput->ts) {
|
||||
|
@ -2998,7 +2997,7 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S
|
|||
pOutput->bytes = pInput->bytes;
|
||||
|
||||
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
|
||||
firstlastSaveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput);
|
||||
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
|
||||
|
||||
pOutput->hasResult = true;
|
||||
}
|
||||
|
@ -3016,7 +3015,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
|
|||
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||
char* data = colDataGetData(pCol, i);
|
||||
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
|
||||
firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery);
|
||||
firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
|
||||
if (!numOfElems) {
|
||||
numOfElems = pInputInfo->hasResult ? 1 : 0;
|
||||
}
|
||||
|
|
|
@ -40,20 +40,3 @@ target_link_libraries(
|
|||
PUBLIC common
|
||||
PUBLIC os
|
||||
)
|
||||
|
||||
add_executable(sdbDump sdbDump.c)
|
||||
target_link_libraries(
|
||||
sdbDump
|
||||
PUBLIC dnode
|
||||
PUBLIC mnode
|
||||
PUBLIC stream
|
||||
PUBLIC sdb
|
||||
PUBLIC os
|
||||
)
|
||||
target_include_directories(
|
||||
sdbDump
|
||||
PUBLIC "${TD_SOURCE_DIR}/include/dnode/mnode"
|
||||
PRIVATE "${TD_SOURCE_DIR}/source/dnode/mnode/impl/inc"
|
||||
PRIVATE "${TD_SOURCE_DIR}/source/dnode/mnode/sdb/inc"
|
||||
PRIVATE "${TD_SOURCE_DIR}/source/dnode/mgmt/node_mgmt/inc"
|
||||
)
|
||||
|
|
|
@ -1,475 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dmMgmt.h"
|
||||
#include "mndInt.h"
|
||||
#include "sdb.h"
|
||||
#include "tconfig.h"
|
||||
#include "tjson.h"
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-result"
|
||||
|
||||
#define TMP_DNODE_DIR TD_TMP_DIR_PATH "dumpsdb"
|
||||
#define TMP_MNODE_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode"
|
||||
#define TMP_SDB_DATA_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "data"
|
||||
#define TMP_SDB_SYNC_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "sync"
|
||||
#define TMP_SDB_MNODE_JSON TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "mnode.json"
|
||||
#define TMP_SDB_DATA_FILE TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "data" TD_DIRSEP "sdb.data"
|
||||
#define TMP_SDB_RAFT_CFG_FILE TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "sync" TD_DIRSEP "raft_config.json"
|
||||
#define TMP_SDB_RAFT_STORE_FILE TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "sync" TD_DIRSEP "raft_store.json"
|
||||
|
||||
void reportStartup(const char *name, const char *desc) {}
|
||||
|
||||
void sendRsp(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
|
||||
|
||||
int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
char *i642str(int64_t val) {
|
||||
static char str[24] = {0};
|
||||
snprintf(str, sizeof(str), "%" PRId64, val);
|
||||
return str;
|
||||
}
|
||||
|
||||
void dumpFunc(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpDb(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "dbs", items);
|
||||
|
||||
while (1) {
|
||||
SDbObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToObject(items, "db", item);
|
||||
|
||||
tjsonAddStringToObject(item, "name", pObj->name);
|
||||
tjsonAddStringToObject(item, "acct", pObj->acct);
|
||||
tjsonAddStringToObject(item, "createUser", pObj->createUser);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||
tjsonAddIntegerToObject(item, "cfgVersion", pObj->cfgVersion);
|
||||
tjsonAddIntegerToObject(item, "vgVersion", pObj->vgVersion);
|
||||
tjsonAddIntegerToObject(item, "numOfVgroups", pObj->cfg.numOfVgroups);
|
||||
tjsonAddIntegerToObject(item, "numOfStables", pObj->cfg.numOfStables);
|
||||
tjsonAddIntegerToObject(item, "buffer", pObj->cfg.buffer);
|
||||
tjsonAddIntegerToObject(item, "pageSize", pObj->cfg.pageSize);
|
||||
tjsonAddIntegerToObject(item, "pages", pObj->cfg.pages);
|
||||
tjsonAddIntegerToObject(item, "cacheLastSize", pObj->cfg.cacheLastSize);
|
||||
tjsonAddIntegerToObject(item, "daysPerFile", pObj->cfg.daysPerFile);
|
||||
tjsonAddIntegerToObject(item, "daysToKeep0", pObj->cfg.daysToKeep0);
|
||||
tjsonAddIntegerToObject(item, "daysToKeep1", pObj->cfg.daysToKeep1);
|
||||
tjsonAddIntegerToObject(item, "daysToKeep2", pObj->cfg.daysToKeep2);
|
||||
tjsonAddIntegerToObject(item, "minRows", pObj->cfg.minRows);
|
||||
tjsonAddIntegerToObject(item, "maxRows", pObj->cfg.maxRows);
|
||||
tjsonAddIntegerToObject(item, "precision", pObj->cfg.precision);
|
||||
tjsonAddIntegerToObject(item, "compression", pObj->cfg.compression);
|
||||
tjsonAddIntegerToObject(item, "replications", pObj->cfg.replications);
|
||||
tjsonAddIntegerToObject(item, "strict", pObj->cfg.strict);
|
||||
tjsonAddIntegerToObject(item, "cacheLast", pObj->cfg.cacheLast);
|
||||
tjsonAddIntegerToObject(item, "hashMethod", pObj->cfg.hashMethod);
|
||||
tjsonAddIntegerToObject(item, "numOfRetensions", pObj->cfg.numOfRetensions);
|
||||
tjsonAddIntegerToObject(item, "schemaless", pObj->cfg.schemaless);
|
||||
tjsonAddIntegerToObject(item, "walLevel", pObj->cfg.walLevel);
|
||||
tjsonAddIntegerToObject(item, "walFsyncPeriod", pObj->cfg.walFsyncPeriod);
|
||||
tjsonAddIntegerToObject(item, "walRetentionPeriod", pObj->cfg.walRetentionPeriod);
|
||||
tjsonAddIntegerToObject(item, "walRetentionSize", pObj->cfg.walRetentionSize);
|
||||
tjsonAddIntegerToObject(item, "walRollPeriod", pObj->cfg.walRollPeriod);
|
||||
tjsonAddIntegerToObject(item, "walSegmentSize", pObj->cfg.walSegmentSize);
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpStb(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "stbs", items);
|
||||
|
||||
while (1) {
|
||||
SStbObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToObject(items, "stb", item);
|
||||
|
||||
tjsonAddStringToObject(item, "name", pObj->name);
|
||||
tjsonAddStringToObject(item, "db", pObj->db);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
|
||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||
tjsonAddIntegerToObject(item, "tagVer", pObj->tagVer);
|
||||
tjsonAddIntegerToObject(item, "colVer", pObj->colVer);
|
||||
tjsonAddIntegerToObject(item, "nextColId", pObj->nextColId);
|
||||
tjsonAddIntegerToObject(item, "watermark1", pObj->watermark[0]);
|
||||
tjsonAddIntegerToObject(item, "watermark2", pObj->watermark[1]);
|
||||
tjsonAddIntegerToObject(item, "maxdelay1", pObj->maxdelay[0]);
|
||||
tjsonAddIntegerToObject(item, "maxdelay2", pObj->maxdelay[1]);
|
||||
tjsonAddIntegerToObject(item, "ttl", pObj->ttl);
|
||||
tjsonAddIntegerToObject(item, "numOfColumns", pObj->numOfColumns);
|
||||
tjsonAddIntegerToObject(item, "numOfTags", pObj->numOfTags);
|
||||
tjsonAddIntegerToObject(item, "commentLen", pObj->commentLen);
|
||||
tjsonAddIntegerToObject(item, "ast1Len", pObj->ast1Len);
|
||||
tjsonAddIntegerToObject(item, "ast2Len", pObj->ast2Len);
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpSma(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpVgroup(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpTopic(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpConsumber(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpSubscribe(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpOffset(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpStream(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpAcct(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "accts", items);
|
||||
|
||||
while (1) {
|
||||
SAcctObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_ACCT, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToObject(items, "acct", item);
|
||||
|
||||
tjsonAddStringToObject(item, "acct", pObj->acct);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddIntegerToObject(item, "acctId", pObj->acctId);
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpAuth(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpUser(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "users", items);
|
||||
|
||||
while (1) {
|
||||
SUserObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToObject(items, "user", item);
|
||||
|
||||
tjsonAddStringToObject(item, "name", pObj->user);
|
||||
tjsonAddStringToObject(item, "pass", pObj->pass);
|
||||
tjsonAddStringToObject(item, "acct", pObj->acct);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddIntegerToObject(item, "superUser", pObj->superUser);
|
||||
tjsonAddIntegerToObject(item, "authVersion", pObj->authVersion);
|
||||
tjsonAddIntegerToObject(item, "numOfReadDbs", taosHashGetSize(pObj->readDbs));
|
||||
tjsonAddIntegerToObject(item, "numOfWriteDbs", taosHashGetSize(pObj->writeDbs));
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpDnode(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "dnodes", items);
|
||||
|
||||
while (1) {
|
||||
SDnodeObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToObject(items, "dnode", item);
|
||||
|
||||
tjsonAddIntegerToObject(item, "id", pObj->id);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddIntegerToObject(item, "port", pObj->port);
|
||||
tjsonAddStringToObject(item, "fqdn", pObj->fqdn);
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpBnode(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpSnode(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpQnode(SSdb *pSdb, SJson *json) {}
|
||||
|
||||
void dumpMnode(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "mnodes", items);
|
||||
|
||||
while (1) {
|
||||
SMnodeObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToObject(items, "mnode", item);
|
||||
|
||||
tjsonAddIntegerToObject(item, "id", pObj->id);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpCluster(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "clusters", items);
|
||||
|
||||
while (1) {
|
||||
SClusterObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToObject(items, "cluster", item);
|
||||
|
||||
tjsonAddStringToObject(item, "id", i642str(pObj->id));
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "updateTime", i642str(pObj->updateTime));
|
||||
tjsonAddStringToObject(item, "name", pObj->name);
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpTrans(SSdb *pSdb, SJson *json) {
|
||||
void *pIter = NULL;
|
||||
SJson *items = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "transactions", items);
|
||||
|
||||
while (1) {
|
||||
STrans *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SJson *item = tjsonCreateObject();
|
||||
tjsonAddItemToObject(items, "trans", item);
|
||||
|
||||
tjsonAddIntegerToObject(item, "id", pObj->id);
|
||||
tjsonAddIntegerToObject(item, "stage", pObj->stage);
|
||||
tjsonAddIntegerToObject(item, "policy", pObj->policy);
|
||||
tjsonAddIntegerToObject(item, "conflict", pObj->conflict);
|
||||
tjsonAddIntegerToObject(item, "exec", pObj->exec);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "dbname", pObj->dbname);
|
||||
tjsonAddStringToObject(item, "stbname", pObj->stbname);
|
||||
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
|
||||
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
|
||||
tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpHeader(SSdb *pSdb, SJson *json) {
|
||||
tjsonAddIntegerToObject(json, "sver", 1);
|
||||
tjsonAddStringToObject(json, "applyIndex", i642str(pSdb->applyIndex));
|
||||
tjsonAddStringToObject(json, "applyTerm", i642str(pSdb->applyTerm));
|
||||
tjsonAddStringToObject(json, "applyConfig", i642str(pSdb->applyConfig));
|
||||
|
||||
SJson *maxIdsJson = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "maxIds", maxIdsJson);
|
||||
for (int32_t i = 0; i < SDB_MAX; ++i) {
|
||||
int64_t maxId = 0;
|
||||
if (i < SDB_MAX) {
|
||||
maxId = pSdb->maxId[i];
|
||||
}
|
||||
tjsonAddStringToObject(maxIdsJson, sdbTableName(i), i642str(maxId));
|
||||
}
|
||||
|
||||
SJson *tableVersJson = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "tableVers", tableVersJson);
|
||||
for (int32_t i = 0; i < SDB_MAX; ++i) {
|
||||
int64_t tableVer = 0;
|
||||
if (i < SDB_MAX) {
|
||||
tableVer = pSdb->tableVer[i];
|
||||
}
|
||||
tjsonAddStringToObject(tableVersJson, sdbTableName(i), i642str(tableVer));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t dumpSdb() {
|
||||
wDebugFlag = 0;
|
||||
mDebugFlag = 0;
|
||||
sDebugFlag = 0;
|
||||
|
||||
SMsgCb msgCb = {0};
|
||||
msgCb.reportStartupFp = reportStartup;
|
||||
msgCb.sendReqFp = sendReq;
|
||||
msgCb.sendRspFp = sendRsp;
|
||||
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
||||
tmsgSetDefault(&msgCb);
|
||||
walInit();
|
||||
syncInit();
|
||||
|
||||
SMnodeOpt opt = {.msgCb = msgCb};
|
||||
SMnode *pMnode = mndOpen(TMP_MNODE_DIR, &opt);
|
||||
if (pMnode == NULL) return -1;
|
||||
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SJson *json = tjsonCreateObject();
|
||||
dumpHeader(pSdb, json);
|
||||
dumpFunc(pSdb, json);
|
||||
dumpDb(pSdb, json);
|
||||
dumpStb(pSdb, json);
|
||||
dumpSma(pSdb, json);
|
||||
dumpVgroup(pSdb, json);
|
||||
dumpTopic(pSdb, json);
|
||||
dumpConsumber(pSdb, json);
|
||||
dumpSubscribe(pSdb, json);
|
||||
dumpOffset(pSdb, json);
|
||||
dumpStream(pSdb, json);
|
||||
dumpAcct(pSdb, json);
|
||||
dumpAuth(pSdb, json);
|
||||
dumpUser(pSdb, json);
|
||||
dumpDnode(pSdb, json);
|
||||
dumpBnode(pSdb, json);
|
||||
dumpSnode(pSdb, json);
|
||||
dumpQnode(pSdb, json);
|
||||
dumpMnode(pSdb, json);
|
||||
dumpCluster(pSdb, json);
|
||||
dumpTrans(pSdb, json);
|
||||
|
||||
char *pCont = tjsonToString(json);
|
||||
int32_t contLen = strlen(pCont);
|
||||
char file[] = "sdb.json";
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to write %s since %s", file, terrstr());
|
||||
return -1;
|
||||
}
|
||||
taosWriteFile(pFile, pCont, contLen);
|
||||
taosWriteFile(pFile, "\n", 1);
|
||||
taosFsyncFile(pFile);
|
||||
taosCloseFile(&pFile);
|
||||
tjsonDelete(json);
|
||||
taosMemoryFree(pCont);
|
||||
taosRemoveDir(TMP_DNODE_DIR);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t parseArgs(int32_t argc, char *argv[]) {
|
||||
for (int32_t i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-c") == 0) {
|
||||
if (i < argc - 1) {
|
||||
if (strlen(argv[++i]) >= PATH_MAX) {
|
||||
printf("config file path overflow");
|
||||
return -1;
|
||||
}
|
||||
tstrncpy(configDir, argv[i], PATH_MAX);
|
||||
} else {
|
||||
printf("'-c' requires a parameter, default is %s\n", configDir);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
printf("-c Configuration directory. \n");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (taosCreateLog("dumplog", 1, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
||||
printf("failed to dump since init log error\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
|
||||
printf("failed to dump since read config error\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
char mnodeJson[PATH_MAX] = {0};
|
||||
char dataFile[PATH_MAX] = {0};
|
||||
char raftCfgFile[PATH_MAX] = {0};
|
||||
char raftStoreFile[PATH_MAX] = {0};
|
||||
snprintf(mnodeJson, PATH_MAX, "%s" TD_DIRSEP "mnode" TD_DIRSEP "mnode.json", tsDataDir);
|
||||
snprintf(dataFile, PATH_MAX, "%s" TD_DIRSEP "mnode" TD_DIRSEP "data" TD_DIRSEP "sdb.data", tsDataDir);
|
||||
snprintf(raftCfgFile, PATH_MAX, "%s" TD_DIRSEP "mnode" TD_DIRSEP "sync" TD_DIRSEP "raft_config.json", tsDataDir);
|
||||
snprintf(raftStoreFile, PATH_MAX, "%s" TD_DIRSEP "mnode" TD_DIRSEP "sync" TD_DIRSEP "raft_store.json", tsDataDir);
|
||||
|
||||
char cmd[PATH_MAX * 2] = {0};
|
||||
snprintf(cmd, sizeof(cmd), "rm -rf %s", TMP_DNODE_DIR);
|
||||
|
||||
system(cmd);
|
||||
#ifdef WINDOWS
|
||||
taosMulMkDir(TMP_SDB_DATA_DIR);
|
||||
taosMulMkDir(TMP_SDB_SYNC_DIR);
|
||||
snprintf(cmd, sizeof(cmd), "cp %s %s 2>nul", mnodeJson, TMP_SDB_MNODE_JSON);
|
||||
system(cmd);
|
||||
snprintf(cmd, sizeof(cmd), "cp %s %s 2>nul", dataFile, TMP_SDB_DATA_FILE);
|
||||
system(cmd);
|
||||
snprintf(cmd, sizeof(cmd), "cp %s %s 2>nul", raftCfgFile, TMP_SDB_RAFT_CFG_FILE);
|
||||
system(cmd);
|
||||
snprintf(cmd, sizeof(cmd), "cp %s %s 2>nul", raftStoreFile, TMP_SDB_RAFT_STORE_FILE);
|
||||
system(cmd);
|
||||
#else
|
||||
snprintf(cmd, sizeof(cmd), "mkdir -p %s", TMP_SDB_DATA_DIR);
|
||||
system(cmd);
|
||||
snprintf(cmd, sizeof(cmd), "mkdir -p %s", TMP_SDB_SYNC_DIR);
|
||||
system(cmd);
|
||||
snprintf(cmd, sizeof(cmd), "cp %s %s 2>/dev/null", mnodeJson, TMP_SDB_MNODE_JSON);
|
||||
system(cmd);
|
||||
snprintf(cmd, sizeof(cmd), "cp %s %s 2>/dev/null", dataFile, TMP_SDB_DATA_FILE);
|
||||
system(cmd);
|
||||
snprintf(cmd, sizeof(cmd), "cp %s %s 2>/dev/null", raftCfgFile, TMP_SDB_RAFT_CFG_FILE);
|
||||
system(cmd);
|
||||
snprintf(cmd, sizeof(cmd), "cp %s %s 2>/dev/null", raftStoreFile, TMP_SDB_RAFT_STORE_FILE);
|
||||
system(cmd);
|
||||
#endif
|
||||
|
||||
strcpy(tsDataDir, TMP_DNODE_DIR);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t main(int32_t argc, char *argv[]) {
|
||||
if (parseArgs(argc, argv) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return dumpSdb();
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
Loading…
Reference in New Issue