TD-10431 fix bug in sdb

This commit is contained in:
Shengliang Guan 2021-11-29 19:35:42 +08:00
parent c7451a9949
commit 6781400508
12 changed files with 200 additions and 171 deletions

View File

@ -114,14 +114,14 @@ typedef enum {
SDB_START = 0, SDB_START = 0,
SDB_TRANS = 1, SDB_TRANS = 1,
SDB_CLUSTER = 2, SDB_CLUSTER = 2,
SDB_DNODE = 3, SDB_MNODE = 3,
SDB_MNODE = 4, SDB_DNODE = 4,
SDB_USER = 5, SDB_USER = 5,
SDB_AUTH = 6, SDB_AUTH = 6,
SDB_ACCT = 7, SDB_ACCT = 7,
SDB_DB = 8,
SDB_VGROUP = 9, SDB_VGROUP = 9,
SDB_STABLE = 10, SDB_STABLE = 9,
SDB_DB = 10,
SDB_FUNC = 11, SDB_FUNC = 11,
SDB_MAX = 12 SDB_MAX = 12
} ESdbType; } ESdbType;
@ -149,10 +149,11 @@ typedef struct SSdbOpt {
const char *path; const char *path;
} SSdbOpt; } SSdbOpt;
SSdb *sdbOpen(SSdbOpt *pOption); SSdb *sdbInit(SSdbOpt *pOption);
void sdbClose(SSdb *pSdb); void sdbCleanup(SSdb *pSdb);
int32_t sdbSetTable(SSdb *pSdb, SSdbTable table);
int32_t sdbDeploy(SSdb *pSdb); int32_t sdbDeploy(SSdb *pSdb);
void sdbSetTable(SSdb *pSdb, SSdbTable table); int32_t sdbReadFile(SSdb *pSdb);
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw); int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw);
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey); void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey);

View File

@ -464,7 +464,7 @@ static void *dnodeThreadRoutine(void *param) {
pthread_testcancel(); pthread_testcancel();
if (dndGetStat(pDnode) == DND_STAT_RUNNING) { if (dndGetStat(pDnode) == DND_STAT_RUNNING) {
dndSendStatusMsg(pDnode); // dndSendStatusMsg(pDnode);
} }
} }
} }

View File

@ -41,10 +41,10 @@ typedef struct SMnode {
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
tmr_h timer; tmr_h timer;
char *path;
SSdb *pSdb; SSdb *pSdb;
SDnode *pDnode; SDnode *pDnode;
char *path; SArray *pSteps;
SArray steps;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX]; MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;

View File

@ -111,8 +111,7 @@ int32_t mndInitAcct(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mnodeAcctActionUpdate, .updateFp = (SdbUpdateFp)mnodeAcctActionUpdate,
.deleteFp = (SdbDeleteFp)mnodeAcctActionDelete}; .deleteFp = (SdbDeleteFp)mnodeAcctActionDelete};
sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
return 0;
} }
void mndCleanupAcct(SMnode *pMnode) {} void mndCleanupAcct(SMnode *pMnode) {}

View File

@ -320,14 +320,11 @@ int32_t mndInitTrans(SMnode *pMnode) {
.insertFp = (SdbInsertFp)mndTransActionInsert, .insertFp = (SdbInsertFp)mndTransActionInsert,
.updateFp = (SdbUpdateFp)mndTransActionUpdate, .updateFp = (SdbUpdateFp)mndTransActionUpdate,
.deleteFp = (SdbDeleteFp)mndTransActionDelete}; .deleteFp = (SdbDeleteFp)mndTransActionDelete};
sdbSetTable(pMnode->pSdb, table);
mInfo("trn module is initialized"); return sdbSetTable(pMnode->pSdb, table);
return 0;
} }
void mndCleanupTrans(SMnode *pMnode) { mInfo("trn module is cleaned up"); } void mndCleanupTrans(SMnode *pMnode) {}
int32_t mndTransPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) { int32_t mndTransPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) {
if (syncfp == NULL) return -1; if (syncfp == NULL) return -1;

View File

@ -223,11 +223,10 @@ int32_t mndInitUser(SMnode *pMnode) {
.insertFp = (SdbInsertFp)mndUserActionInsert, .insertFp = (SdbInsertFp)mndUserActionInsert,
.updateFp = (SdbUpdateFp)mndUserActionUpdate, .updateFp = (SdbUpdateFp)mndUserActionUpdate,
.deleteFp = (SdbDeleteFp)mndUserActionDelete}; .deleteFp = (SdbDeleteFp)mndUserActionDelete};
sdbSetTable(pMnode->pSdb, table);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_USER, mndProcessCreateUserMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_USER, mndProcessCreateUserMsg);
return 0; return sdbSetTable(pMnode->pSdb, table);
} }
void mndCleanupUser(SMnode *pMnode) {} void mndCleanupUser(SMnode *pMnode) {}

View File

@ -36,6 +36,7 @@ int32_t mndGetDnodeId(SMnode *pMnode) {
if (pMnode != NULL) { if (pMnode != NULL) {
return pMnode->dnodeId; return pMnode->dnodeId;
} }
return -1; return -1;
} }
@ -43,6 +44,7 @@ int64_t mndGetClusterId(SMnode *pMnode) {
if (pMnode != NULL) { if (pMnode != NULL) {
return pMnode->clusterId; return pMnode->clusterId;
} }
return -1; return -1;
} }
@ -50,6 +52,8 @@ tmr_h mndGetTimer(SMnode *pMnode) {
if (pMnode != NULL) { if (pMnode != NULL) {
return pMnode->timer; return pMnode->timer;
} }
return NULL;
} }
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
@ -76,6 +80,7 @@ static int32_t mndInitTimer(SMnode *pMnode) {
} }
if (pMnode->timer == NULL) { if (pMnode->timer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
@ -93,12 +98,12 @@ static int32_t mnodeCreateDir(SMnode *pMnode, const char *path) {
pMnode->path = strdup(path); pMnode->path = strdup(path);
if (pMnode->path == NULL) { if (pMnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return -1;
} }
if (taosMkDir(pMnode->path) != 0) { if (taosMkDir(pMnode->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return terrno; return -1;
} }
return 0; return 0;
@ -108,7 +113,7 @@ static int32_t mndInitSdb(SMnode *pMnode) {
SSdbOpt opt = {0}; SSdbOpt opt = {0};
opt.path = pMnode->path; opt.path = pMnode->path;
pMnode->pSdb = sdbOpen(&opt); pMnode->pSdb = sdbInit(&opt);
if (pMnode->pSdb == NULL) { if (pMnode->pSdb == NULL) {
return -1; return -1;
} }
@ -117,10 +122,11 @@ static int32_t mndInitSdb(SMnode *pMnode) {
} }
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); } static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
static void mndCleanupSdb(SMnode *pMnode) { static void mndCleanupSdb(SMnode *pMnode) {
if (pMnode->pSdb) { if (pMnode->pSdb) {
sdbClose(pMnode->pSdb); sdbCleanup(pMnode->pSdb);
pMnode->pSdb = NULL; pMnode->pSdb = NULL;
} }
} }
@ -130,9 +136,8 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
step.name = name; step.name = name;
step.initFp = initFp; step.initFp = initFp;
step.cleanupFp = cleanupFp; step.cleanupFp = cleanupFp;
if (taosArrayPush(&pMnode->steps, &step) != NULL) { if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc step:%s since %s", name, terrstr());
return -1; return -1;
} }
@ -140,68 +145,73 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
} }
static int32_t mndInitSteps(SMnode *pMnode) { static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stable", mndInitStable, mndCleanupStable) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-stable", mndInitStable, mndCleanupStable) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
if (pMnode->clusterId <= 0) { if (pMnode->clusterId <= 0) {
if (mndAllocStep(pMnode, "mnode-deploy", mndDeploySdb, NULL) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
} else {
if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
} }
if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-balance", mndInitBalance, mndCleanupBalance) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-balance", mndInitBalance, mndCleanupBalance) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return terrno; if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
return 0; return 0;
} }
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) { static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
if (pMnode->pSteps == NULL) return;
if (pos == -1) { if (pos == -1) {
pos = taosArrayGetSize(&pMnode->steps); pos = taosArrayGetSize(pMnode->pSteps);
} }
for (int32_t s = pos; s >= 0; s--) { for (int32_t s = pos; s >= 0; s--) {
SMnodeStep *pStep = taosArrayGet(&pMnode->steps, pos); SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
mDebug("step:%s will cleanup", pStep->name); mDebug("step:%s will cleanup", pStep->name);
if (pStep->cleanupFp != NULL) { if (pStep->cleanupFp != NULL) {
(*pStep->cleanupFp)(pMnode); (*pStep->cleanupFp)(pMnode);
} }
} }
taosArrayClear(&pMnode->steps); taosArrayClear(pMnode->pSteps);
pMnode->pSteps = NULL;
} }
static int32_t mndExecSteps(SMnode *pMnode) { static int32_t mndExecSteps(SMnode *pMnode) {
int32_t size = taosArrayGetSize(&pMnode->steps); int32_t size = taosArrayGetSize(pMnode->pSteps);
for (int32_t pos = 0; pos < size; pos++) { for (int32_t pos = 0; pos < size; pos++) {
SMnodeStep *pStep = taosArrayGet(&pMnode->steps, pos); SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
if (pStep->initFp == NULL) continue; if (pStep->initFp == NULL) continue;
// (*pMnode->reportProgress)(pStep->name, "start initialize"); // (*pMnode->reportProgress)(pStep->name, "start initialize");
int32_t code = (*pStep->initFp)(pMnode); if ((*pStep->initFp)(pMnode) != 0) {
if (code != 0) { mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr());
mError("step:%s exec failed since %s, start to cleanup", pStep->name, tstrerror(code));
mndCleanupSteps(pMnode, pos); mndCleanupSteps(pMnode, pos);
terrno = code; return -1;
return code;
} else { } else {
mDebug("step:%s is initialized", pStep->name); mDebug("step:%s is initialized", pStep->name);
} }
// (*pMnode->reportProgress)(pStep->name, "initialize completed"); // (*pMnode->reportProgress)(pStep->name, "initialize completed");
} }
return 0;
} }
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
@ -217,20 +227,40 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { pMnode->putMsgToApplyMsgFp == NULL) {
terrno = TSDB_CODE_MND_APP_ERROR; terrno = TSDB_CODE_MND_APP_ERROR;
return terrno; return -1;
}
if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_APP_ERROR;
return -1;
} }
return 0; return 0;
} }
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
mDebug("start to open mnode in %s", path);
SMnode *pMnode = calloc(1, sizeof(SMnode)); SMnode *pMnode = calloc(1, sizeof(SMnode));
if (pMnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to open mnode since %s", terrstr());
return NULL;
}
pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
if (pMnode->pSteps == NULL) {
free(pMnode);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to open mnode since %s", terrstr());
return NULL;
}
int32_t code = mnodeCreateDir(pMnode, path); int32_t code = mnodeCreateDir(pMnode, path);
if (code != 0) { if (mnodeCreateDir(pMnode, path) != 0) {
mError("failed to set mnode options since %s", terrstr()); mError("failed to open mnode since %s", tstrerror(code));
mndClose(pMnode); mndClose(pMnode);
terrno = code; terrno = code;
return NULL; return NULL;
@ -238,51 +268,66 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
code = mndSetOptions(pMnode, pOption); code = mndSetOptions(pMnode, pOption);
if (code != 0) { if (code != 0) {
mError("failed to open mnode since %s", tstrerror(code));
mndClose(pMnode); mndClose(pMnode);
terrno = code; terrno = code;
mError("failed to set mnode options since %s", terrstr());
return NULL; return NULL;
} }
code = mndInitSteps(pMnode); code = mndInitSteps(pMnode);
if (code != 0) { if (code != 0) {
mError("failed to open mnode since %s", tstrerror(code));
mndClose(pMnode); mndClose(pMnode);
terrno = code; terrno = code;
mError("failed to int steps since %s", terrstr());
return NULL; return NULL;
} }
code = mndExecSteps(pMnode); code = mndExecSteps(pMnode);
if (code != 0) { if (code != 0) {
mError("failed to open mnode since %s", tstrerror(code));
mndClose(pMnode); mndClose(pMnode);
terrno = code; terrno = code;
mError("failed to execute steps since %s", terrstr());
return NULL; return NULL;
} }
mDebug("mnode:%p object is created", pMnode); mDebug("mnode open successfully ");
return pMnode; return pMnode;
} }
void mndClose(SMnode *pMnode) { void mndClose(SMnode *pMnode) {
mndCleanupSteps(pMnode, -1); if (pMnode != NULL) {
tfree(pMnode->path); mDebug("start to close mnode");
tfree(pMnode); mndCleanupSteps(pMnode, -1);
mDebug("mnode:%p object is cleaned up", pMnode); tfree(pMnode->path);
tfree(pMnode);
mDebug("mnode is closed");
}
} }
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
assert(1); mDebug("start to alter mnode");
mDebug("mnode is altered");
return 0; return 0;
} }
void mndDestroy(const char *path) { void mndDestroy(const char *path) {
mDebug("mnode in %s will be destroyed", path); mDebug("start to destroy mnode at %s", path);
taosRemoveDir(path); taosRemoveDir(path);
mDebug("mnode is destroyed");
} }
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
assert(1); pLoad->numOfDnode = 0;
pLoad->numOfMnode = 0;
pLoad->numOfVgroup = 0;
pLoad->numOfDatabase = 0;
pLoad->numOfSuperTable = 0;
pLoad->numOfChildTable = 0;
pLoad->numOfColumn = 0;
pLoad->totalPoints = 0;
pLoad->totalStorage = 0;
pLoad->compStorage = 0;
return 0; return 0;
} }
@ -343,7 +388,8 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
code = (*fp)(pMnode, pMsg); code = (*fp)(pMnode, pMsg);
if (code != 0) { if (code != 0) {
mError("msg:%p, app:%p type:%s failed to process since %s", pMsg, ahandle, taosMsg[msgType], tstrerror(code)); code = terrno;
mError("msg:%p, app:%p type:%s failed to process since %s", pMsg, ahandle, taosMsg[msgType], terrstr());
goto PROCESS_RPC_END; goto PROCESS_RPC_END;
} }

View File

@ -37,7 +37,7 @@ extern "C" {
#define SDB_MAX_SIZE (32 * 1024) #define SDB_MAX_SIZE (32 * 1024)
typedef struct SSdbRaw { typedef struct SSdbRaw {
int8_t sdb; int8_t type;
int8_t sver; int8_t sver;
int8_t status; int8_t status;
int8_t reserved; int8_t reserved;
@ -46,7 +46,7 @@ typedef struct SSdbRaw {
} SSdbRaw; } SSdbRaw;
typedef struct SSdbRow { typedef struct SSdbRow {
ESdbType sdb; ESdbType type;
ESdbStatus status; ESdbStatus status;
int32_t refCount; int32_t refCount;
char pObj[]; char pObj[];
@ -69,7 +69,6 @@ typedef struct SSdb {
SdbDecodeFp decodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX];
} SSdb; } SSdb;
int32_t sdbReadFile(SSdb *pSdb);
int32_t sdbWriteFile(SSdb *pSdb); int32_t sdbWriteFile(SSdb *pSdb);
int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw); int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw);

View File

@ -16,70 +16,43 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sdbInt.h" #include "sdbInt.h"
SSdb *sdbOpen(SSdbOpt *pOption) { SSdb *sdbInit(SSdbOpt *pOption) {
mDebug("start to open sdb in %s", pOption->path); mDebug("start to init sdb in %s", pOption->path);
SSdb *pSdb = calloc(1, sizeof(SSdb)); SSdb *pSdb = calloc(1, sizeof(SSdb));
if (pSdb == NULL) { if (pSdb == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to open sdb since %s", terrstr()); mError("failed to init sdb since %s", terrstr());
return NULL; return NULL;
} }
char path[PATH_MAX + 100]; char path[PATH_MAX + 100];
snprintf(path, PATH_MAX + 100, "%s%scur", pOption->path, TD_DIRSEP); snprintf(path, PATH_MAX + 100, "%s", pOption->path);
pSdb->currDir = strdup(path); pSdb->currDir = strdup(path);
snprintf(path, PATH_MAX + 100, "%s%ssync", pOption->path, TD_DIRSEP); snprintf(path, PATH_MAX + 100, "%s%ssync", pOption->path, TD_DIRSEP);
pSdb->syncDir = strdup(path); pSdb->syncDir = strdup(path);
snprintf(path, PATH_MAX + 100, "%s%stmp", pOption->path, TD_DIRSEP); snprintf(path, PATH_MAX + 100, "%s%stmp", pOption->path, TD_DIRSEP);
pSdb->tmpDir = strdup(path); pSdb->tmpDir = strdup(path);
if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) { if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) {
sdbClose(pSdb); sdbCleanup(pSdb);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to open sdb since %s", terrstr()); mError("failed to init sdb since %s", terrstr());
return NULL; return NULL;
} }
for (int32_t i = 0; i < SDB_MAX; ++i) { for (int32_t i = 0; i < SDB_MAX; ++i) {
int32_t type;
if (pSdb->keyTypes[i] == SDB_KEY_INT32) {
type = TSDB_DATA_TYPE_INT;
} else if (pSdb->keyTypes[i] == SDB_KEY_INT64) {
type = TSDB_DATA_TYPE_BIGINT;
} else {
type = TSDB_DATA_TYPE_BINARY;
}
SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK);
if (hash == NULL) {
sdbClose(pSdb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to open sdb since %s", terrstr());
return NULL;
}
pSdb->hashObjs[i] = hash;
taosInitRWLatch(&pSdb->locks[i]); taosInitRWLatch(&pSdb->locks[i]);
} }
int32_t code = sdbReadFile(pSdb); mDebug("sdb init successfully");
if (code != 0) {
sdbClose(pSdb);
terrno = code;
mError("failed to open sdb since %s", terrstr());
return NULL;
}
mDebug("sdb open successfully");
return pSdb; return pSdb;
} }
void sdbClose(SSdb *pSdb) { void sdbCleanup(SSdb *pSdb) {
mDebug("start to close sdb"); mDebug("start to cleanup sdb");
if (pSdb->curVer != pSdb->lastCommitVer) { if (pSdb->curVer != pSdb->lastCommitVer) {
mDebug("start to write sdb file since curVer:% " PRId64 " and lastCommitVer:%" PRId64 " inequal", pSdb->curVer, mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer);
pSdb->lastCommitVer);
sdbWriteFile(pSdb); sdbWriteFile(pSdb);
} }
@ -104,10 +77,10 @@ void sdbClose(SSdb *pSdb) {
pSdb->hashObjs[i] = NULL; pSdb->hashObjs[i] = NULL;
} }
mDebug("sdb is closed"); mDebug("sdb is cleaned up");
} }
void sdbSetTable(SSdb *pSdb, SSdbTable table) { int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
ESdbType sdb = table.sdbType; ESdbType sdb = table.sdbType;
pSdb->keyTypes[sdb] = table.keyType; pSdb->keyTypes[sdb] = table.keyType;
pSdb->insertFps[sdb] = table.insertFp; pSdb->insertFps[sdb] = table.insertFp;
@ -117,5 +90,25 @@ void sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb->encodeFps[sdb] = table.encodeFp; pSdb->encodeFps[sdb] = table.encodeFp;
pSdb->decodeFps[sdb] = table.decodeFp; pSdb->decodeFps[sdb] = table.decodeFp;
mDebug("set sdb handle of table %d", pSdb, table); for (int32_t i = 0; i < SDB_MAX; ++i) {
int32_t type;
if (pSdb->keyTypes[i] == SDB_KEY_INT32) {
type = TSDB_DATA_TYPE_INT;
} else if (pSdb->keyTypes[i] == SDB_KEY_INT64) {
type = TSDB_DATA_TYPE_BIGINT;
} else {
type = TSDB_DATA_TYPE_BINARY;
}
SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK);
if (hash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pSdb->hashObjs[i] = hash;
taosInitRWLatch(&pSdb->locks[i]);
}
return 0;
} }

View File

@ -18,45 +18,41 @@
#include "tchecksum.h" #include "tchecksum.h"
static int32_t sdbCreateDir(SSdb *pSdb) { static int32_t sdbCreateDir(SSdb *pSdb) {
int32_t code = taosMkDir(pSdb->currDir); if (taosMkDir(pSdb->currDir) != 0) {
if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno);
code = TAOS_SYSTEM_ERROR(errno); mError("failed to create dir:%s since %s", pSdb->currDir, terrstr());
mError("failed to create dir:%s since %s", pSdb->currDir, tstrerror(code)); return -1;
return code;
} }
code = taosMkDir(pSdb->syncDir); if (taosMkDir(pSdb->syncDir) != 0) {
if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno);
code = TAOS_SYSTEM_ERROR(errno); mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr());
mError("failed to create dir:%s since %s", pSdb->syncDir, tstrerror(code)); return -1;
return code;
} }
code = taosMkDir(pSdb->tmpDir); if (taosMkDir(pSdb->tmpDir) != 0) {
if (code != 0) { terrno = TAOS_SYSTEM_ERROR(errno);
code = TAOS_SYSTEM_ERROR(errno); mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
mError("failed to create dir:%s since %s", pSdb->tmpDir, tstrerror(code)); return -1;
return code;
} }
return 0; return 0;
} }
static int32_t sdbRunDeployFp(SSdb *pSdb) { static int32_t sdbRunDeployFp(SSdb *pSdb) {
mDebug("start to run sdb deploy functions"); mDebug("start to deploy sdb");
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
SdbDeployFp fp = pSdb->deployFps[i]; SdbDeployFp fp = pSdb->deployFps[i];
if (fp == NULL) continue; if (fp == NULL) continue;
int32_t code = (*fp)(pSdb); if ((*fp)(pSdb) != 0) {
if (code != 0) { mError("failed to deploy sdb:%d since %s", i, terrstr());
mError("failed to deploy sdb:%d since %s", i, tstrerror(code)); return -1;
return code;
} }
} }
mDebug("sdb deploy functions run finished"); mDebug("sdb deploy successfully");
return 0; return 0;
} }
@ -68,9 +64,9 @@ int32_t sdbReadFile(SSdb *pSdb) {
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
if (pRaw == NULL) { if (pRaw == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed read file since %s", tstrerror(code)); mError("failed read file since %s", terrstr());
return code; return -1;
} }
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
@ -79,9 +75,9 @@ int32_t sdbReadFile(SSdb *pSdb) {
FileFd fd = taosOpenFileRead(file); FileFd fd = taosOpenFileRead(file);
if (fd <= 0) { if (fd <= 0) {
free(pRaw); free(pRaw);
code = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, tstrerror(code)); mError("failed to read file:%s since %s", file, terrstr());
return code; return -1;
} }
while (1) { while (1) {
@ -135,6 +131,7 @@ PARSE_SDB_DATA_ERROR:
taosCloseFile(fd); taosCloseFile(fd);
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
terrno = code;
return code; return code;
} }
@ -142,15 +139,15 @@ int32_t sdbWriteFile(SSdb *pSdb) {
int32_t code = 0; int32_t code = 0;
char tmpfile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0};
snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", pSdb->tmpDir); snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
char curfile[PATH_MAX] = {0}; char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%ssdb.data", pSdb->currDir); snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
FileFd fd = taosOpenFileCreateWrite(tmpfile); FileFd fd = taosOpenFileCreateWrite(tmpfile);
if (fd <= 0) { if (fd <= 0) {
code = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code)); mError("failed to open file:%s for write since %s", tmpfile, terrstr());
return code; return -1;
} }
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
@ -220,23 +217,21 @@ int32_t sdbWriteFile(SSdb *pSdb) {
mDebug("write file:%s successfully", curfile); mDebug("write file:%s successfully", curfile);
} }
terrno = code;
return code; return code;
} }
int32_t sdbDeploy(SSdb *pSdb) { int32_t sdbDeploy(SSdb *pSdb) {
int32_t code = sdbCreateDir(pSdb); if (sdbCreateDir(pSdb) != 0) {
if (code != 0) { return -1;
return code;
} }
code = sdbRunDeployFp(pSdb); if (sdbRunDeployFp(pSdb) != 0) {
if (code != 0) { return -1;
return code;
} }
code = sdbWriteFile(pSdb); if (sdbWriteFile(pSdb) != 0) {
if (code != 0) { return -1;
return code;
} }
return 0; return 0;

View File

@ -49,7 +49,7 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) {
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
int32_t code = 0; int32_t code = 0;
SRWLatch *pLock = &pSdb->locks[pRow->sdb]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosWLockLatch(pLock);
SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize);
@ -70,7 +70,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
SdbInsertFp insertFp = pSdb->insertFps[pRow->sdb]; SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
if (insertFp != NULL) { if (insertFp != NULL) {
code = (*insertFp)(pSdb, pRow->pObj); code = (*insertFp)(pSdb, pRow->pObj);
if (code != 0) { if (code != 0) {
@ -88,7 +88,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
int32_t code = 0; int32_t code = 0;
SRWLatch *pLock = &pSdb->locks[pRow->sdb]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock); taosRLockLatch(pLock);
SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize);
@ -101,7 +101,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
pRow->status = pRaw->status; pRow->status = pRaw->status;
taosRUnLockLatch(pLock); taosRUnLockLatch(pLock);
SdbUpdateFp updateFp = pSdb->updateFps[pRow->sdb]; SdbUpdateFp updateFp = pSdb->updateFps[pRow->type];
if (updateFp != NULL) { if (updateFp != NULL) {
code = (*updateFp)(pSdb, pRow->pObj, pDstRow->pObj); code = (*updateFp)(pSdb, pRow->pObj, pDstRow->pObj);
} }
@ -113,7 +113,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
int32_t code = 0; int32_t code = 0;
SRWLatch *pLock = &pSdb->locks[pRow->sdb]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosWLockLatch(pLock);
SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize);
@ -128,7 +128,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove(hash, pDstRow->pObj, keySize); taosHashRemove(hash, pDstRow->pObj, keySize);
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
SdbDeleteFp deleteFp = pSdb->deleteFps[pDstRow->sdb]; SdbDeleteFp deleteFp = pSdb->deleteFps[pDstRow->type];
if (deleteFp != NULL) { if (deleteFp != NULL) {
code = (*deleteFp)(pSdb, pDstRow->pObj); code = (*deleteFp)(pSdb, pDstRow->pObj);
} }
@ -139,18 +139,18 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) { int32_t sdbWriteRaw(SSdb *pSdb, SSdbRaw *pRaw) {
SHashObj *hash = sdbGetHash(pSdb, pRaw->sdb); SHashObj *hash = sdbGetHash(pSdb, pRaw->type);
if (hash == NULL) return terrno; if (hash == NULL) return terrno;
SdbDecodeFp decodeFp = pSdb->decodeFps[pRaw->sdb]; SdbDecodeFp decodeFp = pSdb->decodeFps[pRaw->type];
SSdbRow *pRow = (*decodeFp)(pRaw); SSdbRow *pRow = (*decodeFp)(pRaw);
if (pRow == NULL) { if (pRow == NULL) {
return terrno; return terrno;
} }
pRow->sdb = pRaw->sdb; pRow->type = pRaw->type;
int32_t keySize = sdbGetkeySize(pSdb, pRow->sdb, pRow->pObj); int32_t keySize = sdbGetkeySize(pSdb, pRow->type, pRow->pObj);
int32_t code = TSDB_CODE_SDB_INVALID_ACTION_TYPE; int32_t code = TSDB_CODE_SDB_INVALID_ACTION_TYPE;
switch (pRaw->status) { switch (pRaw->status) {
@ -217,9 +217,9 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
if (pObj == NULL) return; if (pObj == NULL) return;
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
if (pRow->sdb >= SDB_MAX || pRow->sdb <= SDB_START) return; if (pRow->type >= SDB_MAX || pRow->type <= SDB_START) return;
SRWLatch *pLock = &pSdb->locks[pRow->sdb]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock); taosRLockLatch(pLock);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
@ -257,10 +257,10 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
void sdbCancelFetch(SSdb *pSdb, void *pIter) { void sdbCancelFetch(SSdb *pSdb, void *pIter) {
if (pIter == NULL) return; if (pIter == NULL) return;
SSdbRow *pRow = *(SSdbRow **)pIter; SSdbRow *pRow = *(SSdbRow **)pIter;
SHashObj *hash = sdbGetHash(pSdb, pRow->sdb); SHashObj *hash = sdbGetHash(pSdb, pRow->type);
if (hash == NULL) return; if (hash == NULL) return;
SRWLatch *pLock = &pSdb->locks[pRow->sdb]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock); taosRLockLatch(pLock);
taosHashCancelIterate(hash, pIter); taosHashCancelIterate(hash, pIter);
taosRUnLockLatch(pLock); taosRUnLockLatch(pLock);

View File

@ -16,14 +16,14 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sdbInt.h" #include "sdbInt.h"
SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen) { SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
SSdbRaw *pRaw = calloc(1, dataLen + sizeof(SSdbRaw)); SSdbRaw *pRaw = calloc(1, dataLen + sizeof(SSdbRaw));
if (pRaw == NULL) { if (pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pRaw->sdb = sdb; pRaw->type = type;
pRaw->sver = sver; pRaw->sver = sver;
pRaw->dataLen = dataLen; pRaw->dataLen = dataLen;
return pRaw; return pRaw;