Merge pull request #11900 from taosdata/feature/dnode
refactor(cluster): adjust mnode sync codes
This commit is contained in:
commit
a76d8df492
|
@ -40,12 +40,12 @@ extern "C" {
|
||||||
|
|
||||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
|
|
||||||
typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg);
|
typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg);
|
||||||
typedef int32_t (*MndInitFp)(SMnode *pMnode);
|
typedef int32_t (*MndInitFp)(SMnode *pMnode);
|
||||||
typedef void (*MndCleanupFp)(SMnode *pMnode);
|
typedef void (*MndCleanupFp)(SMnode *pMnode);
|
||||||
typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
|
typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
|
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
|
||||||
typedef struct SQWorkerMgmt SQHandle;
|
typedef struct SQWorkerMgmt SQHandle;
|
||||||
|
|
||||||
|
@ -84,32 +84,32 @@ typedef struct {
|
||||||
int64_t timeseriesAllowed;
|
int64_t timeseriesAllowed;
|
||||||
} SGrantInfo;
|
} SGrantInfo;
|
||||||
|
|
||||||
struct SMnode {
|
typedef struct SMnode {
|
||||||
int32_t selfId;
|
int32_t selfId;
|
||||||
int64_t clusterId;
|
int64_t clusterId;
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
int8_t selfIndex;
|
int8_t selfIndex;
|
||||||
SReplica replicas[TSDB_MAX_REPLICA];
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
tmr_h timer;
|
tmr_h timer;
|
||||||
tmr_h transTimer;
|
tmr_h transTimer;
|
||||||
tmr_h mqTimer;
|
tmr_h mqTimer;
|
||||||
tmr_h telemTimer;
|
tmr_h telemTimer;
|
||||||
char *path;
|
char *path;
|
||||||
int64_t checkTime;
|
int64_t checkTime;
|
||||||
SSdb *pSdb;
|
SSdb *pSdb;
|
||||||
SMgmtWrapper *pWrapper;
|
SMgmtWrapper *pWrapper;
|
||||||
SArray *pSteps;
|
SArray *pSteps;
|
||||||
SQHandle *pQuery;
|
SQHandle *pQuery;
|
||||||
SShowMgmt showMgmt;
|
SShowMgmt showMgmt;
|
||||||
SProfileMgmt profileMgmt;
|
SProfileMgmt profileMgmt;
|
||||||
STelemMgmt telemMgmt;
|
STelemMgmt telemMgmt;
|
||||||
SSyncMgmt syncMgmt;
|
SSyncMgmt syncMgmt;
|
||||||
SHashObj *infosMeta;
|
SHashObj *infosMeta;
|
||||||
SHashObj *perfsMeta;
|
SHashObj *perfsMeta;
|
||||||
SGrantInfo grant;
|
SGrantInfo grant;
|
||||||
MndMsgFp msgFp[TDMT_MAX];
|
MndMsgFp msgFp[TDMT_MAX];
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
};
|
} SMnode;
|
||||||
|
|
||||||
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
|
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
|
||||||
int64_t mndGenerateUid(char *name, int32_t len);
|
int64_t mndGenerateUid(char *name, int32_t len);
|
||||||
|
|
|
@ -22,13 +22,15 @@ static int32_t mndInitWal(SMnode *pMnode) {
|
||||||
|
|
||||||
char path[PATH_MAX] = {0};
|
char path[PATH_MAX] = {0};
|
||||||
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
||||||
SWalCfg cfg = {.vgId = 1,
|
SWalCfg cfg = {
|
||||||
.fsyncPeriod = 0,
|
.vgId = 1,
|
||||||
.rollPeriod = -1,
|
.fsyncPeriod = 0,
|
||||||
.segSize = -1,
|
.rollPeriod = -1,
|
||||||
.retentionPeriod = -1,
|
.segSize = -1,
|
||||||
.retentionSize = -1,
|
.retentionPeriod = -1,
|
||||||
.level = TAOS_WAL_FSYNC};
|
.retentionSize = -1,
|
||||||
|
.level = TAOS_WAL_FSYNC,
|
||||||
|
};
|
||||||
pMgmt->pWal = walOpen(path, &cfg);
|
pMgmt->pWal = walOpen(path, &cfg);
|
||||||
if (pMgmt->pWal == NULL) return -1;
|
if (pMgmt->pWal == NULL) return -1;
|
||||||
|
|
||||||
|
@ -54,62 +56,62 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
|
||||||
|
|
||||||
int64_t first = walGetFirstVer(pWal);
|
int64_t first = walGetFirstVer(pWal);
|
||||||
int64_t last = walGetLastVer(pWal);
|
int64_t last = walGetLastVer(pWal);
|
||||||
mDebug("start to restore sdb wal, sdb ver:%" PRId64 ", wal first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
|
mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
|
||||||
|
|
||||||
first = TMAX(lastSdbVer + 1, first);
|
first = TMAX(lastSdbVer + 1, first);
|
||||||
for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
|
for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
|
||||||
if (walReadWithHandle(pHandle, ver) < 0) {
|
if (walReadWithHandle(pHandle, ver) < 0) {
|
||||||
mError("failed to read by wal handle since %s, ver:%" PRId64, terrstr(), ver);
|
mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr());
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalHead *pHead = pHandle->pHead;
|
SWalHead *pHead = pHandle->pHead;
|
||||||
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
if (sdbVer + 1 != ver) {
|
if (sdbVer + 1 != ver) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
|
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
|
||||||
mError("failed to read wal from sdb, sdbVer:%" PRId64 " inconsistent with ver:%" PRId64, sdbVer, ver);
|
mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer);
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("wal:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
|
mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
|
||||||
if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
|
if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
|
||||||
mError("failed to read wal from sdb since %s, ver:%" PRId64, terrstr(), ver);
|
mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr());
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbUpdateVer(pSdb, 1);
|
sdbUpdateVer(pSdb, 1);
|
||||||
mDebug("wal:%" PRId64 ", is restored", ver);
|
mDebug("ver:%" PRId64 ", is restored", ver);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
|
mDebug("restore wal finished, sdbver:%" PRId64, sdbVer);
|
||||||
|
|
||||||
mndTransPullup(pMnode);
|
mndTransPullup(pMnode);
|
||||||
sdbVer = sdbUpdateVer(pSdb, 0);
|
sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
mDebug("pullup trans finished, sdb ver:%" PRId64, sdbVer);
|
mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer);
|
||||||
|
|
||||||
if (sdbVer != lastSdbVer) {
|
if (sdbVer != lastSdbVer) {
|
||||||
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
||||||
if (sdbWriteFile(pSdb) != 0) {
|
if (sdbWriteFile(pSdb) != 0) {
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walCommit(pWal, sdbVer) != 0) {
|
if (walCommit(pWal, sdbVer) != 0) {
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walBeginSnapshot(pWal, sdbVer) < 0) {
|
if (walBeginSnapshot(pWal, sdbVer) < 0) {
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (walEndSnapshot(pWal) < 0) {
|
if (walEndSnapshot(pWal) < 0) {
|
||||||
goto WAL_RESTORE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
WAL_RESTORE_OVER:
|
_OVER:
|
||||||
walCloseReadHandle(pHandle);
|
walCloseReadHandle(pHandle);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -158,11 +160,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
||||||
int64_t ver = sdbUpdateVer(pSdb, 1);
|
int64_t ver = sdbUpdateVer(pSdb, 1);
|
||||||
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
|
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
|
||||||
sdbUpdateVer(pSdb, -1);
|
sdbUpdateVer(pSdb, -1);
|
||||||
mError("failed to write raw:%p since %s, ver:%" PRId64, pRaw, terrstr(), ver);
|
mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("raw:%p, write to wal, ver:%" PRId64, pRaw, ver);
|
mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw);
|
||||||
walCommit(pWal, ver);
|
walCommit(pWal, ver);
|
||||||
walFsync(pWal, true);
|
walFsync(pWal, true);
|
||||||
|
|
||||||
|
|
|
@ -146,7 +146,6 @@ int32_t mndInitTelem(SMnode* pMnode) {
|
||||||
taosGetEmail(pMgmt->email, sizeof(pMgmt->email));
|
taosGetEmail(pMgmt->email, sizeof(pMgmt->email));
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
|
mndSetMsgHandle(pMnode, TDMT_MND_TELEM_TIMER, mndProcessTelemTimer);
|
||||||
|
|
||||||
mDebug("mnode telemetry is initialized");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,8 +21,8 @@
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "tbase64.h"
|
#include "tbase64.h"
|
||||||
|
|
||||||
#define TSDB_USER_VER_NUMBER 1
|
#define USER_VER_NUMBER 1
|
||||||
#define TSDB_USER_RESERVE_SIZE 64
|
#define USER_RESERVE_SIZE 64
|
||||||
|
|
||||||
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
|
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
|
||||||
static SSdbRaw *mndUserActionEncode(SUserObj *pUser);
|
static SSdbRaw *mndUserActionEncode(SUserObj *pUser);
|
||||||
|
@ -35,7 +35,7 @@ static int32_t mndProcessCreateUserReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessAlterUserReq(SNodeMsg *pReq);
|
static int32_t mndProcessAlterUserReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessDropUserReq(SNodeMsg *pReq);
|
static int32_t mndProcessDropUserReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq);
|
static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq);
|
||||||
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
|
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
int32_t mndInitUser(SMnode *pMnode) {
|
int32_t mndInitUser(SMnode *pMnode) {
|
||||||
|
@ -93,9 +93,9 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
|
||||||
|
|
||||||
int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs);
|
int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs);
|
||||||
int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs);
|
int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs);
|
||||||
int32_t size = sizeof(SUserObj) + TSDB_USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN;
|
int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN;
|
||||||
|
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, TSDB_USER_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto USER_ENCODE_OVER;
|
if (pRaw == NULL) goto USER_ENCODE_OVER;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
|
@ -120,7 +120,7 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
|
||||||
db = taosHashIterate(pUser->writeDbs, db);
|
db = taosHashIterate(pUser->writeDbs, db);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE, USER_ENCODE_OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, USER_ENCODE_OVER)
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, USER_ENCODE_OVER)
|
SDB_SET_DATALEN(pRaw, dataPos, USER_ENCODE_OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
@ -142,7 +142,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto USER_DECODE_OVER;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto USER_DECODE_OVER;
|
||||||
|
|
||||||
if (sver != TSDB_USER_VER_NUMBER) {
|
if (sver != USER_VER_NUMBER) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
goto USER_DECODE_OVER;
|
goto USER_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
|
||||||
taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN);
|
taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE, USER_DECODE_OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, USER_DECODE_OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
|
@ -639,7 +639,7 @@ GET_AUTH_OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
|
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
@ -652,29 +652,29 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pB
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
cols = 0;
|
cols = 0;
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
||||||
|
|
||||||
char name[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
char name[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->user, pShow->bytes[cols]);
|
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->user, pShow->bytes[cols]);
|
||||||
|
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) name, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)name, false);
|
||||||
|
|
||||||
cols++;
|
cols++;
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
||||||
|
|
||||||
const char* src = pUser->superUser? "super":"normal";
|
const char *src = pUser->superUser ? "super" : "normal";
|
||||||
char b[10+VARSTR_HEADER_SIZE] = {0};
|
char b[10 + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src));
|
STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src));
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) b, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)b, false);
|
||||||
|
|
||||||
cols++;
|
cols++;
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) &pUser->createdTime, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pUser->createdTime, false);
|
||||||
|
|
||||||
cols++;
|
cols++;
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->acct, pShow->bytes[cols]);
|
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->acct, pShow->bytes[cols]);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char*) name, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)name, false);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pUser);
|
sdbRelease(pSdb, pUser);
|
||||||
|
|
|
@ -162,7 +162,4 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) {
|
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { return atomic_add_fetch_64(&pSdb->curVer, val); }
|
||||||
pSdb->curVer += val;
|
|
||||||
return pSdb->curVer;
|
|
||||||
}
|
|
|
@ -28,7 +28,7 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
|
||||||
if (fp == NULL) continue;
|
if (fp == NULL) continue;
|
||||||
|
|
||||||
if ((*fp)(pSdb->pMnode) != 0) {
|
if ((*fp)(pSdb->pMnode) != 0) {
|
||||||
mError("failed to deploy sdb:%d since %s", i, terrstr());
|
mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,7 +85,6 @@ typedef struct {
|
||||||
TAOS* conn;
|
TAOS* conn;
|
||||||
TdThread pid;
|
TdThread pid;
|
||||||
tsem_t cancelSem;
|
tsem_t cancelSem;
|
||||||
int64_t result;
|
|
||||||
} SShellObj;
|
} SShellObj;
|
||||||
|
|
||||||
// shellArguments.c
|
// shellArguments.c
|
||||||
|
|
|
@ -213,13 +213,10 @@ void shellRunSingleCommandImp(char *command) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t oresult = atomic_load_64(&shell.result);
|
|
||||||
|
|
||||||
if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
|
if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
|
||||||
fprintf(stdout, "Database changed.\n\n");
|
fprintf(stdout, "Database changed.\n\n");
|
||||||
fflush(stdout);
|
fflush(stdout);
|
||||||
|
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
taos_free_result(pSql);
|
taos_free_result(pSql);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -230,10 +227,7 @@ void shellRunSingleCommandImp(char *command) {
|
||||||
int32_t error_no = 0;
|
int32_t error_no = 0;
|
||||||
|
|
||||||
int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
|
int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
|
||||||
if (numOfRows < 0) {
|
if (numOfRows < 0) return;
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
et = taosGetTimestampUs();
|
et = taosGetTimestampUs();
|
||||||
if (error_no == 0) {
|
if (error_no == 0) {
|
||||||
|
@ -250,8 +244,6 @@ void shellRunSingleCommandImp(char *command) {
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("\n");
|
printf("\n");
|
||||||
|
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
||||||
|
@ -398,7 +390,6 @@ int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
|
||||||
row = taos_fetch_row(tres);
|
row = taos_fetch_row(tres);
|
||||||
} while (row != NULL);
|
} while (row != NULL);
|
||||||
|
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
|
@ -766,7 +757,6 @@ void shellWriteHistory() {
|
||||||
|
|
||||||
void shellPrintError(TAOS_RES *tres, int64_t st) {
|
void shellPrintError(TAOS_RES *tres, int64_t st) {
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
atomic_store_ptr(&shell.result, 0);
|
|
||||||
fprintf(stderr, "\nDB error: %s (%.6fs)\n", taos_errstr(tres), (et - st) / 1E6);
|
fprintf(stderr, "\nDB error: %s (%.6fs)\n", taos_errstr(tres), (et - st) / 1E6);
|
||||||
taos_free_result(tres);
|
taos_free_result(tres);
|
||||||
}
|
}
|
||||||
|
@ -872,7 +862,6 @@ void shellGetGrantInfo() {
|
||||||
fprintf(stdout, "Server is Enterprise %s Edition, %s and will expire at %s.\n", serverVersion, sinfo, expiretime);
|
fprintf(stdout, "Server is Enterprise %s Edition, %s and will expire at %s.\n", serverVersion, sinfo, expiretime);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_64(&shell.result, 0);
|
|
||||||
taos_free_result(tres);
|
taos_free_result(tres);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue