diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 20de27e59d..e03d3ffd18 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -55,9 +55,9 @@ typedef struct { int32_t mnodeInit(SMnodePara para); void mnodeCleanup(); -int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg); -void mnodeUnDeploy(char *path); -int32_t mnodeStart(char *path, SMnodeCfg *pCfg); +int32_t mnodeDeploy(SMnodeCfg *pCfg); +void mnodeUnDeploy(); +int32_t mnodeStart(SMnodeCfg *pCfg); int32_t mnodeAlter(SMnodeCfg *pCfg); void mnodeStop(); diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index f4a8ccd06e..90c5ef0c4a 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -20,69 +20,105 @@ extern "C" { #endif -#define SDB_GET_BINARY_VAL(pData, dataLen, val, valLen, code) \ - if (code == 0) { \ - if ((dataLen) >= (valLen)) { \ - memcpy((val), (char *)(pData), (valLen)); \ - (dataLen) -= (valLen); \ - (pData) = (char *)(pData) + (valLen); \ - } else { \ - code = TSDB_CODE_SDB_INVALID_DATA_LEN; \ - } \ +#define SDB_GET_INT64(pData, pRow, dataPos, val) \ + { \ + if (sdbGetRawInt64(pRaw, dataPos, val) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += sizeof(int64_t); \ } -#define SDB_GET_INT32_VAL(pData, dataLen, val, code) \ - if (code == 0) { \ - if (dataLen >= sizeof(int32_t)) { \ - *(int32_t *)(pData) = (int32_t)(val); \ - (dataLen) -= sizeof(int32_t); \ - (pData) = (char *)(pData) + sizeof(int32_t); \ - } else { \ - code = TSDB_CODE_SDB_INVALID_DATA_LEN; \ - } \ +#define SDB_GET_INT32(pData, pRow, dataPos, val) \ + { \ + if (sdbGetRawInt32(pRaw, dataPos, val) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += sizeof(int32_t); \ } -#define SDB_GET_INT64_VAL(pData, dataLen, val, code) \ - if (code == 0) { \ - if (dataLen >= sizeof(int64_t)) { \ - *(int64_t *)(pData) = (int64_t)(val); \ - (dataLen) -= sizeof(int64_t); \ - (pData) = (char *)(pData) + sizeof(int64_t); \ - } else { \ - code = TSDB_CODE_SDB_INVALID_DATA_LEN; \ - } \ +#define SDB_GET_INT8(pData, pRow, dataPos, val) \ + { \ + if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += sizeof(int8_t); \ } -#define SDB_SET_INT64_VAL(pData, dataLen, val) \ - { \ - *(int64_t *)(pData) = (int64_t)(val); \ - (dataLen) += sizeof(int64_t); \ - (pData) += sizeof(int64_t); \ +#define SDB_GET_BINARY(pRaw, pRow, dataPos, val, valLen) \ + { \ + if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += valLen; \ } -#define SDB_SET_INT32_VAL(pData, dataLen, val) \ - { \ - *(int32_t *)(pData) = (int32_t)(val); \ - (dataLen) += sizeof(int32_t); \ - (pData) += sizeof(int32_t); \ +#define SDB_SET_INT64(pData, dataPos, val) \ + { \ + if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ + dataPos += sizeof(int64_t); \ } -#define SDB_SET_BINARY_VAL(pData, dataLen, val, valLen) \ - { \ - memcpy((char *)(pData), (val), (valLen)); \ - (dataLen) += (valLen); \ - (pData) += (valLen); \ +#define SDB_SET_INT32(pData, dataPos, val) \ + { \ + if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ + dataPos += sizeof(int32_t); \ } +#define SDB_SET_INT8(pData, dataPos, val) \ + { \ + if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ + dataPos += sizeof(int8_t); \ + } + +#define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \ + { \ + if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ + dataPos += valLen; \ + } + +#define SDB_SET_DATALEN(pRaw, dataLen) \ + { \ + if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + }; \ + } + +typedef struct SSdbRaw SSdbRaw; +typedef struct SSdbRow SSdbRow; +typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; +typedef enum { + SDB_STATUS_CREATING = 1, + SDB_STATUS_READY = 2, + SDB_STATUS_DROPPING = 3, + SDB_STATUS_DROPPED = 4 +} ESdbStatus; + typedef enum { SDB_START = 0, SDB_TRANS = 1, SDB_CLUSTER = 2, SDB_DNODE = 3, SDB_MNODE = 4, - SDB_ACCT = 5, + SDB_USER = 5, SDB_AUTH = 6, - SDB_USER = 7, + SDB_ACCT = 7, SDB_DB = 8, SDB_VGROUP = 9, SDB_STABLE = 10, @@ -90,26 +126,11 @@ typedef enum { SDB_MAX = 12 } ESdbType; -typedef enum { SDB_ACTION_INSERT = 1, SDB_ACTION_UPDATE = 2, SDB_ACTION_DELETE = 3 } ESdbAction; -typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; -typedef enum { SDB_STATUS_CREATING = 1, SDB_STATUS_READY = 2, SDB_STATUS_DROPPING = 3 } ESdbStatus; - -typedef struct { - int8_t type; - int8_t sver; - int8_t status; - int8_t action; - int8_t reserved[4]; - int32_t cksum; - int32_t dataLen; - char data[]; -} SSdbRaw; - typedef int32_t (*SdbInsertFp)(void *pObj); typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj); typedef int32_t (*SdbDeleteFp)(void *pObj); typedef int32_t (*SdbDeployFp)(); -typedef void *(*SdbDecodeFp)(SSdbRaw *pRaw); +typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef struct { @@ -127,19 +148,38 @@ int32_t sdbInit(); void sdbCleanup(); void sdbSetTable(SSdbTable table); -int32_t sdbRead(); +int32_t sdbOpen(); +void sdbClose(); int32_t sdbWrite(SSdbRaw *pRaw); -int32_t sdbCommit(); int32_t sdbDeploy(); void sdbUnDeploy(); void *sdbAcquire(ESdbType sdb, void *pKey); void sdbRelease(void *pObj); -void *sdbFetch(ESdbType sdb, void *pIter); -void sdbCancelFetch(ESdbType sdb, void *pIter); +void *sdbFetch(ESdbType sdb, void *pIter, void **ppObj); +void sdbCancelFetch(void *pIter); int32_t sdbGetSize(ESdbType sdb); +SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen); +void sdbFreeRaw(SSdbRaw *pRaw); +int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val); +int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val); +int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val); +int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen); +int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen); +int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status); +int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val); +int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val); +int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val); +int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen); +int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver); +int32_t sdbGetRawTotalSize(SSdbRaw *pRaw); + +SSdbRow *sdbAllocRow(int32_t objSize); +void sdbFreeRow(SSdbRow *pRow); +void *sdbGetRowObj(SSdbRow *pRow); + #ifdef __cplusplus } #endif diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b644f4c9a5..022c3e9096 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -66,13 +66,14 @@ int32_t* taosGetErrno(); #define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0103) #define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104) #define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0106) -#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0107) -#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0108) -#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0109) -#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x010A) -#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x010B) -#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x010C) -#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010D) +#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0107) +#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108) +#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0109) +#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x010A) +#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x010B) +#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x010C) +#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x010D) +#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010E) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation") @@ -133,17 +134,18 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir") #define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components") - #define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320) #define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321) #define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0322) -#define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0323) -#define TSDB_CODE_SDB_INVALID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0324) -#define TSDB_CODE_SDB_INVALID_ACTION_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325) -#define TSDB_CODE_SDB_INVALID_STATUS_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326) -#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0327) -#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x0328) -#define TSDB_CODE_SDB_INVALID_META_ROW TAOS_DEF_ERROR_CODE(0, 0x0329) +#define TSDB_CODE_SDB_OBJ_CREATING TAOS_DEF_ERROR_CODE(0, 0x0323) +#define TSDB_CODE_SDB_OBJ_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0324) +#define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325) +#define TSDB_CODE_SDB_INVALID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326) +#define TSDB_CODE_SDB_INVALID_ACTION_TYPE TAOS_DEF_ERROR_CODE(0, 0x0327) +#define TSDB_CODE_SDB_INVALID_STATUS_TYPE TAOS_DEF_ERROR_CODE(0, 0x0328) +#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0329) +#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x032A) +#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x032B) #define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists") #define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist") diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/src/dnodeMnode.c index f6726bf981..48cc1cb20d 100644 --- a/source/dnode/mgmt/src/dnodeMnode.c +++ b/source/dnode/mgmt/src/dnodeMnode.c @@ -136,8 +136,8 @@ static int32_t dnodeWriteMnodeFile() { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", tsMnode.dropped); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsMnode.dropped); + len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", tsMnode.deployed); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", tsMnode.dropped); len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); @@ -180,7 +180,7 @@ static int32_t dnodeStartMnode() { tsMnode.deployed = 1; taosWUnLockLatch(&tsMnode.latch); - return code; + return mnodeStart(NULL); } static void dnodeStopMnode() { @@ -212,14 +212,14 @@ static int32_t dnodeUnDeployMnode() { } dnodeStopMnode(); - mnodeUnDeploy(tsMnodeDir); + mnodeUnDeploy(); dnodeWriteMnodeFile(); return code; } static int32_t dnodeDeployMnode(SMnodeCfg *pCfg) { - int32_t code = mnodeDeploy(tsMnodeDir, pCfg); + int32_t code = mnodeDeploy(pCfg); if (code != 0) { dError("failed to deploy mnode since %s", tstrerror(code)); return code; @@ -536,7 +536,7 @@ static int32_t dnodeOpenMnode() { SMnodeCfg cfg = {.replica = 1}; cfg.replicas[0].port = tsServerPort; tstrncpy(cfg.replicas[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); - return dnodeDeployMnode(&cfg); + code = dnodeDeployMnode(&cfg); } else { dInfo("start to open mnode"); return dnodeStartMnode(); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 21db85fcfa..f52e60dbad 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -134,10 +134,11 @@ static int32_t mnodeAllocInitSteps() { } static int32_t mnodeAllocStartSteps() { - struct SSteps *steps = taosStepInit(7, NULL); + struct SSteps *steps = taosStepInit(8, NULL); if (steps == NULL) return -1; taosStepAdd(steps, "mnode-timer", mnodeInitTimer, NULL); + taosStepAdd(steps, "mnode-sdb-file", sdbOpen, sdbClose); taosStepAdd(steps, "mnode-balance", mnodeInitBalance, mnodeCleanupBalance); taosStepAdd(steps, "mnode-profile", mnodeInitProfile, mnodeCleanupProfile); taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow); @@ -170,7 +171,7 @@ int32_t mnodeInit(SMnodePara para) { void mnodeCleanup() { taosStepCleanup(tsMint.pInitSteps); } -int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) { +int32_t mnodeDeploy(SMnodeCfg *pCfg) { if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) { if (sdbDeploy() != 0) { mError("failed to deploy sdb since %s", terrstr()); @@ -182,9 +183,9 @@ int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) { return 0; } -void mnodeUnDeploy(char *path) { sdbUnDeploy(); } +void mnodeUnDeploy() { sdbUnDeploy(); } -int32_t mnodeStart(char *path, SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); } +int32_t mnodeStart(SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); } int32_t mnodeAlter(SMnodeCfg *pCfg) { return 0; } diff --git a/source/dnode/mnode/impl/src/mnodeAcct.c b/source/dnode/mnode/impl/src/mnodeAcct.c index 381d6a184e..6f5c498ed2 100644 --- a/source/dnode/mnode/impl/src/mnodeAcct.c +++ b/source/dnode/mnode/impl/src/mnodeAcct.c @@ -16,69 +16,56 @@ #define _DEFAULT_SOURCE #include "mnodeInt.h" -#define ACCT_VER 1 +#define SDB_ACCT_VER 1 static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct) { - SSdbRaw *pRaw = calloc(1, sizeof(SAcctObj) + sizeof(SSdbRaw)); - if (pRaw == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, SDB_ACCT_VER, sizeof(SAcctObj)); + if (pRaw == NULL) return NULL; - int32_t dataLen = 0; - char *pData = pRaw->data; - SDB_SET_BINARY_VAL(pData, dataLen, pAcct->acct, TSDB_USER_LEN) - SDB_SET_INT64_VAL(pData, dataLen, pAcct->createdTime) - SDB_SET_INT64_VAL(pData, dataLen, pAcct->updateTime) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->acctId) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->status) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxUsers) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxDbs) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxTimeSeries) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxStreams) - SDB_SET_INT64_VAL(pData, dataLen, pAcct->cfg.maxStorage) - SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.accessState) + int32_t dataPos = 0; + SDB_SET_BINARY(pRaw, dataPos, pAcct->acct, TSDB_USER_LEN) + SDB_SET_INT64(pRaw, dataPos, pAcct->createdTime) + SDB_SET_INT64(pRaw, dataPos, pAcct->updateTime) + SDB_SET_INT32(pRaw, dataPos, pAcct->acctId) + SDB_SET_INT32(pRaw, dataPos, pAcct->status) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxUsers) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxDbs) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxTimeSeries) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxStreams) + SDB_SET_INT64(pRaw, dataPos, pAcct->cfg.maxStorage) + SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.accessState) + SDB_SET_DATALEN(pRaw, dataPos); - pRaw->dataLen = dataLen; - pRaw->type = SDB_ACCT; - pRaw->sver = ACCT_VER; return pRaw; } -static SAcctObj *mnodeAcctActionDecode(SSdbRaw *pRaw) { - if (pRaw->sver != ACCT_VER) { +static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_ACCT_VER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } - SAcctObj *pAcct = calloc(1, sizeof(SAcctObj)); - if (pAcct == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj)); + SAcctObj *pAcct = sdbGetRowObj(pRow); + if (pAcct == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, pRow, dataPos, pAcct->acct, TSDB_USER_LEN) + SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->updateTime) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->acctId) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->status) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxUsers) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxDbs) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxTimeSeries) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxStreams) + SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->cfg.maxStorage) + SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.accessState) - int32_t code = 0; - int32_t dataLen = pRaw->dataLen; - char *pData = pRaw->data; - SDB_GET_BINARY_VAL(pData, dataLen, pAcct->acct, TSDB_USER_LEN, code) - SDB_GET_INT64_VAL(pData, dataLen, pAcct->createdTime, code) - SDB_GET_INT64_VAL(pData, dataLen, pAcct->updateTime, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->acctId, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->status, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxUsers, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxDbs, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxTimeSeries, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxStreams, code) - SDB_GET_INT64_VAL(pData, dataLen, pAcct->cfg.maxStorage, code) - SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.accessState, code) - - if (code != 0) { - tfree(pAcct); - terrno = code; - return NULL; - } - - return pAcct; + return pRow; } static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; } @@ -86,7 +73,9 @@ static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; } static int32_t mnodeAcctActionDelete(SAcctObj *pAcct) { return 0; } static int32_t mnodeAcctActionUpdate(SAcctObj *pSrcAcct, SAcctObj *pDstAcct) { - memcpy(pDstAcct, pSrcAcct, (int32_t)((char *)&pDstAcct->info - (char *)&pDstAcct)); + SAcctObj tObj; + int32_t len = (int32_t)((int8_t *)&tObj.info - (int8_t *)&tObj); + memcpy(pDstAcct, pSrcAcct, len); return 0; } @@ -106,9 +95,8 @@ static int32_t mnodeCreateDefaultAcct() { .accessState = TSDB_VN_ALL_ACCCESS}; SSdbRaw *pRaw = mnodeAcctActionEncode(&acctObj); - if (pRaw == NULL) { - return -1; - } + if (pRaw == NULL) return -1; + sdbSetRawStatus(pRaw, SDB_STATUS_READY); return sdbWrite(pRaw); } diff --git a/source/dnode/mnode/impl/src/mnodeSync.c b/source/dnode/mnode/impl/src/mnodeSync.c index 7541ab6b59..fd34793172 100644 --- a/source/dnode/mnode/impl/src/mnodeSync.c +++ b/source/dnode/mnode/impl/src/mnodeSync.c @@ -21,8 +21,8 @@ int32_t mnodeInitSync() { return 0; } void mnodeCleanUpSync() {} int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData) { - trnApply(pRaw, pData, 0); - free(pRaw); + trnApply(pData, pData, 0); + free(pData); return 0; } diff --git a/source/dnode/mnode/impl/src/mnodeUser.c b/source/dnode/mnode/impl/src/mnodeUser.c index b15dc78ea9..63aa171238 100644 --- a/source/dnode/mnode/impl/src/mnodeUser.c +++ b/source/dnode/mnode/impl/src/mnodeUser.c @@ -19,59 +19,46 @@ #include "tglobal.h" #include "tkey.h" -#define USER_VER 1 +#define SDB_USER_VER 1 static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) { - SSdbRaw *pRaw = calloc(1, sizeof(SUserObj) + sizeof(SSdbRaw)); - if (pRaw == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SAcctObj)); + if (pRaw == NULL) return NULL; - int32_t dataLen = 0; - char *pData = pRaw->data; - SDB_SET_BINARY_VAL(pData, dataLen, pUser->user, TSDB_USER_LEN) - SDB_SET_BINARY_VAL(pData, dataLen, pUser->pass, TSDB_KEY_LEN) - SDB_SET_BINARY_VAL(pData, dataLen, pUser->acct, TSDB_KEY_LEN) - SDB_SET_INT64_VAL(pData, dataLen, pUser->createdTime) - SDB_SET_INT64_VAL(pData, dataLen, pUser->updateTime) - SDB_SET_INT32_VAL(pData, dataLen, pUser->rootAuth) + int32_t dataPos = 0; + SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN) + SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_KEY_LEN) + SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN) + SDB_SET_INT64(pRaw, dataPos, pUser->createdTime) + SDB_SET_INT64(pRaw, dataPos, pUser->updateTime) + SDB_SET_INT8(pRaw, dataPos, pUser->rootAuth) + SDB_SET_DATALEN(pRaw, dataPos); - pRaw->dataLen = dataLen; - pRaw->type = SDB_USER; - pRaw->sver = USER_VER; return pRaw; } -static SUserObj *mnodeUserActionDecode(SSdbRaw *pRaw) { - if (pRaw->sver != USER_VER) { +static SSdbRow *mnodeUserActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_USER_VER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } - SUserObj *pUser = calloc(1, sizeof(SUserObj)); - if (pUser == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj)); + SUserObj *pUser = sdbGetRowObj(pRow); + if (pUser == NULL) return NULL; - int32_t code = 0; - int32_t dataLen = pRaw->dataLen; - char *pData = pRaw->data; - SDB_GET_BINARY_VAL(pData, dataLen, pUser->user, TSDB_USER_LEN, code) - SDB_GET_BINARY_VAL(pData, dataLen, pUser->pass, TSDB_KEY_LEN, code) - SDB_GET_BINARY_VAL(pData, dataLen, pUser->acct, TSDB_USER_LEN, code) - SDB_GET_INT64_VAL(pData, dataLen, pUser->createdTime, code) - SDB_GET_INT64_VAL(pData, dataLen, pUser->updateTime, code) - SDB_GET_INT32_VAL(pData, dataLen, pUser->rootAuth, code) + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->user, TSDB_USER_LEN) + SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->pass, TSDB_KEY_LEN) + SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN) + SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime) + SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->rootAuth) - if (code != 0) { - tfree(pUser); - terrno = code; - return NULL; - } - - return pUser; + return pRow; } static int32_t mnodeUserActionInsert(SUserObj *pUser) { @@ -105,7 +92,9 @@ static int32_t mnodeUserActionDelete(SUserObj *pUser) { } static int32_t mnodeUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) { - memcpy(pDstUser, pSrcUser, (int32_t)((char *)&pDstUser->prohibitDbHash - (char *)&pDstUser)); + SUserObj tObj; + int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj); + memcpy(pDstUser, pSrcUser, len); return 0; } @@ -122,9 +111,8 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { } SSdbRaw *pRaw = mnodeUserActionEncode(&userObj); - if (pRaw == NULL) { - return -1; - } + if (pRaw == NULL) return -1; + sdbSetRawStatus(pRaw, SDB_STATUS_READY); return sdbWrite(pRaw); } @@ -156,32 +144,28 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK); if (pTrans == NULL) return -1; + trnSetRpcHandle(pTrans, pMsg->rpcMsg.handle); SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj); if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) { trnDrop(pTrans); return -1; } - pRedoRaw->status = SDB_STATUS_CREATING; - pRedoRaw->action = SDB_ACTION_INSERT; + sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj); if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) { trnDrop(pTrans); return -1; } - pUndoRaw->status = SDB_STATUS_DROPPING; - pUndoRaw->action = SDB_ACTION_DELETE; + sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED); SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj); if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) { trnDrop(pTrans); return -1; } - pCommitRaw->status = SDB_STATUS_READY; - pCommitRaw->action = SDB_ACTION_UPDATE; - - trnSetRpcHandle(pTrans, pMsg->rpcMsg.handle); + sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (trnPrepare(pTrans, mnodeSyncPropose) != 0) { trnDrop(pTrans); diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 222e16abe9..aa0b3c8a58 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -36,6 +36,22 @@ extern "C" { #define SDB_MAX_SIZE (32 * 1024) +typedef struct SSdbRaw { + int8_t sdb; + int8_t sver; + int8_t status; + int8_t reserved; + int32_t dataLen; + char pData[]; +} SSdbRaw; + +typedef struct SSdbRow { + ESdbType sdb; + ESdbStatus status; + int32_t refCount; + char pObj[]; +} SSdbRow; + typedef struct { char *currDir; char *syncDir; @@ -53,12 +69,9 @@ typedef struct { SdbDecodeFp decodeFps[SDB_MAX]; } SSdbMgr; -typedef struct { - ESdbStatus status; - int32_t refCount; - int32_t dataLen; - char pData[]; -} SSdbRow; +extern SSdbMgr tsSdb; + +int32_t sdbWriteImp(SSdbRaw *pRaw); #ifdef __cplusplus } diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 7489a46552..8e9b7fbecc 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -17,224 +17,7 @@ #include "sdbInt.h" #include "tglobal.h" -static SSdbMgr tsSdb = {0}; - -static int32_t sdbCreateDir() { - if (!taosMkDir(tsSdb.currDir)) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr()); - return -1; - } - - if (!taosMkDir(tsSdb.syncDir)) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", tsSdb.syncDir, terrstr()); - return -1; - } - - if (!taosMkDir(tsSdb.tmpDir)) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", tsSdb.tmpDir, terrstr()); - return -1; - } - - return 0; -} - -static int32_t sdbRunDeployFp() { - for (int32_t i = SDB_START; i < SDB_MAX; ++i) { - SdbDeployFp fp = tsSdb.deployFps[i]; - if (fp == NULL) continue; - if ((*fp)() != 0) { - mError("failed to deploy sdb:%d since %s", i, terrstr()); - return -1; - } - } - - return 0; -} - -static SHashObj *sdbGetHash(int32_t sdb) { - if (sdb >= SDB_MAX || sdb <= SDB_START) { - terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE; - return NULL; - } - - SHashObj *hash = tsSdb.hashObjs[sdb]; - if (hash == NULL) { - terrno = TSDB_CODE_SDB_APP_ERROR; - return NULL; - } - - return hash; -} - -int32_t sdbWrite(SSdbRaw *pRaw) { - SHashObj *hash = sdbGetHash(pRaw->type); - switch (pRaw->action) { - case SDB_ACTION_INSERT: - break; - case SDB_ACTION_UPDATE: - break; - case SDB_ACTION_DELETE: - break; - - default: - break; - } - - return 0; -} - -static int32_t sdbWriteVersion(FileFd fd) { return 0; } - -static int32_t sdbReadVersion(FileFd fd) { return 0; } - -static int32_t sdbReadDataFile() { - int32_t code = 0; - - SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); - if (pRaw == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - char file[PATH_MAX] = {0}; - snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); - FileFd fd = taosOpenFileCreateWrite(file); - if (fd <= 0) { - code = TAOS_SYSTEM_ERROR(errno); - mError("failed to open file:%s for read since %s", file, tstrerror(code)); - return code; - } - - int64_t offset = 0; - while (1) { - int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRaw)); - if (ret == 0) break; - - if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); - mError("failed to read file:%s since %s", file, tstrerror(code)); - break; - } - - if (ret < sizeof(SSdbRaw)) { - code = TSDB_CODE_SDB_APP_ERROR; - mError("failed to read file:%s since %s", file, tstrerror(code)); - break; - } - - code = sdbWrite(pRaw); - if (code != 0) { - mError("failed to read file:%s since %s", file, tstrerror(code)); - goto PARSE_SDB_DATA_ERROR; - } - } - - code = 0; - -PARSE_SDB_DATA_ERROR: - taosCloseFile(fd); - return code; -} - -static int32_t sdbWriteDataFile() { - int32_t code = 0; - - char tmpfile[PATH_MAX] = {0}; - snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir); - - FileFd fd = taosOpenFileCreateWrite(tmpfile); - if (fd <= 0) { - code = TAOS_SYSTEM_ERROR(errno); - mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code)); - return code; - } - - for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { - SHashObj *hash = tsSdb.hashObjs[i]; - if (!hash) continue; - - SdbEncodeFp encodeFp = tsSdb.encodeFps[i]; - if (!encodeFp) continue; - - SSdbRow *pRow = taosHashIterate(hash, NULL); - while (pRow != NULL) { - if (pRow->status == SDB_STATUS_READY) continue; - SSdbRaw *pRaw = (*encodeFp)(pRow->pData); - if (pRaw != NULL) { - taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen); - } else { - taosHashCancelIterate(hash, pRow); - code = TSDB_CODE_SDB_APP_ERROR; - break; - } - - pRow = taosHashIterate(hash, pRow); - } - } - - if (code == 0) { - code = sdbWriteVersion(fd); - } - - taosCloseFile(fd); - - if (code == 0) { - code = taosFsyncFile(fd); - } - - if (code != 0) { - char curfile[PATH_MAX] = {0}; - snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir); - code = taosRenameFile(tmpfile, curfile); - } - - if (code != 0) { - mError("failed to write sdb file since %s", tstrerror(code)); - } else { - mInfo("write sdb file successfully"); - } - - return code; -} - -int32_t sdbRead() { - int32_t code = sdbReadDataFile(); - if (code != 0) { - return code; - } - - mInfo("read sdb file successfully"); - return -1; -} - -int32_t sdbCommit() { - int32_t code = sdbWriteDataFile(); - if (code != 0) { - return code; - } - - return 0; -} - -int32_t sdbDeploy() { - if (sdbCreateDir() != 0) { - return -1; - } - - if (sdbRunDeployFp() != 0) { - return -1; - } - - if (sdbCommit() != 0) { - return -1; - } - - return 0; -} - -void sdbUnDeploy() {} +SSdbMgr tsSdb = {0}; int32_t sdbInit() { char path[PATH_MAX + 100]; @@ -262,7 +45,7 @@ int32_t sdbInit() { type = TSDB_DATA_TYPE_BINARY; } - SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK); + SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK); if (hash == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -275,10 +58,6 @@ int32_t sdbInit() { } void sdbCleanup() { - if (tsSdb.curVer != tsSdb.lastCommitVer) { - sdbCommit(); - } - if (tsSdb.currDir != NULL) { tfree(tsSdb.currDir); } @@ -309,108 +88,4 @@ void sdbSetTable(SSdbTable table) { tsSdb.deployFps[sdb] = table.deployFp; tsSdb.encodeFps[sdb] = table.encodeFp; tsSdb.decodeFps[sdb] = table.decodeFp; -} - -#if 0 -void *sdbInsertRow(ESdbType sdb, void *p) { - SdbHead *pHead = p; - pHead->type = sdb; - pHead->status = SDB_AVAIL; - - char *pKey = (char *)pHead + sizeof(pHead); - int32_t keySize; - EKeyType keyType = tsSdb.keyTypes[pHead->type]; - int32_t dataSize = tsSdb.dataSize[pHead->type]; - - SHashObj *hash = sdbGetHash(pHead->type); - if (hash == NULL) { - return NULL; - } - - if (keyType == SDBINT32) { - keySize = sizeof(int32_t); - } else if (keyType == SDB_KEY_BINARY) { - keySize = strlen(pKey) + 1; - } else { - keySize = sizeof(int64_t); - } - - taosHashPut(hash, pKey, keySize, pHead, dataSize); - return taosHashGet(hash, pKey, keySize); -} - -void sdbDeleteRow(ESdbType sdb, void *p) { - SdbHead *pHead = p; - pHead->status = SDB_STATUS_DROPPED; -} - -void *sdbUpdateRow(ESdbType sdb, void *pHead) { return sdbInsertRow(sdb, pHead); } - -#endif - -void *sdbAcquire(ESdbType sdb, void *pKey) { - terrno = 0; - - SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return NULL; - } - - int32_t keySize; - EKeyType keyType = tsSdb.keyTypes[sdb]; - - switch (keyType) { - case SDB_KEY_INT32: - keySize = sizeof(int32_t); - break; - case SDB_KEY_INT64: - keySize = sizeof(int64_t); - break; - case SDB_KEY_BINARY: - keySize = strlen(pKey) + 1; - break; - default: - keySize = sizeof(int32_t); - } - - SSdbRow *pRow = taosHashGet(hash, pKey, keySize); - if (pRow == NULL) return NULL; - - if (pRow->status == SDB_STATUS_READY) { - atomic_add_fetch_32(&pRow->refCount, 1); - return pRow->pData; - } else { - terrno = -1; // todo - return NULL; - } -} - -void sdbRelease(void *pObj) { - SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); - atomic_sub_fetch_32(&pRow->refCount, 1); -} - -void *sdbFetchRow(ESdbType sdb, void *pIter) { - SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return NULL; - } - - return taosHashIterate(hash, pIter); -} - -void sdbCancelFetch(ESdbType sdb, void *pIter) { - SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return; - } - taosHashCancelIterate(hash, pIter); -} - -int32_t sdbGetSize(ESdbType sdb) { - SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return 0; - } - return taosHashGetSize(hash); } \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c new file mode 100644 index 0000000000..69c82c77f1 --- /dev/null +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -0,0 +1,258 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "sdbInt.h" +#include "tglobal.h" +#include "tchecksum.h" + +static int32_t sdbCreateDir() { + mDebug("start to create mnode at %s", tsMnodeDir); + + if (!taosMkDir(tsSdb.currDir)) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr()); + return -1; + } + + if (!taosMkDir(tsSdb.syncDir)) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", tsSdb.syncDir, terrstr()); + return -1; + } + + if (!taosMkDir(tsSdb.tmpDir)) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", tsSdb.tmpDir, terrstr()); + return -1; + } + + return 0; +} + +static int32_t sdbRunDeployFp() { + mDebug("start to run deploy functions"); + + for (int32_t i = SDB_START; i < SDB_MAX; ++i) { + SdbDeployFp fp = tsSdb.deployFps[i]; + if (fp == NULL) continue; + if ((*fp)() != 0) { + mError("failed to deploy sdb:%d since %s", i, terrstr()); + return -1; + } + } + + return 0; +} + +static int32_t sdbReadDataFile() { + SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); + if (pRaw == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + char file[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); + FileFd fd = taosOpenFileRead(file); + if (fd <= 0) { + free(pRaw); + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to open file:%s for read since %s", file, terrstr()); + return -1; + } + + int64_t offset = 0; + int32_t code = 0; + int32_t readLen = 0; + int64_t ret = 0; + + while (1) { + readLen = sizeof(SSdbRaw); + ret = taosReadFile(fd, pRaw, readLen); + if (ret == 0) break; + + if (ret < 0) { + code = TAOS_SYSTEM_ERROR(errno); + mError("failed to read file:%s since %s", file, tstrerror(code)); + break; + } + + if (ret != readLen) { + code = TSDB_CODE_FILE_CORRUPTED; + mError("failed to read file:%s since %s", file, tstrerror(code)); + break; + } + + readLen = pRaw->dataLen + sizeof(int32_t); + ret = taosReadFile(fd, pRaw->pData, readLen); + if (ret < 0) { + code = TAOS_SYSTEM_ERROR(errno); + mError("failed to read file:%s since %s", file, tstrerror(code)); + break; + } + + if (ret != readLen) { + code = TSDB_CODE_FILE_CORRUPTED; + mError("failed to read file:%s since %s", file, tstrerror(code)); + break; + } + + int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t); + if (!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen) != 0) { + code = TSDB_CODE_CHECKSUM_ERROR; + mError("failed to read file:%s since %s", file, tstrerror(code)); + break; + } + + code = sdbWriteImp(pRaw); + if (code != 0) { + mError("failed to read file:%s since %s", file, terrstr()); + goto PARSE_SDB_DATA_ERROR; + } + } + + code = 0; + +PARSE_SDB_DATA_ERROR: + taosCloseFile(fd); + sdbFreeRaw(pRaw); + terrno = code; + return code; +} + +static int32_t sdbWriteDataFile() { + char tmpfile[PATH_MAX] = {0}; + snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir); + + FileFd fd = taosOpenFileCreateWrite(tmpfile); + if (fd <= 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to open file:%s for write since %s", tmpfile, terrstr()); + return -1; + } + + int32_t code = 0; + + for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { + SdbEncodeFp encodeFp = tsSdb.encodeFps[i]; + if (encodeFp == NULL) continue; + + SHashObj *hash = tsSdb.hashObjs[i]; + SRWLatch *pLock = &tsSdb.locks[i]; + taosWLockLatch(pLock); + + SSdbRow **ppRow = taosHashIterate(hash, NULL); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow == NULL || pRow->status != SDB_STATUS_READY) { + ppRow = taosHashIterate(hash, ppRow); + continue; + } + + SSdbRaw *pRaw = (*encodeFp)(pRow->pObj); + if (pRaw != NULL) { + pRaw->status = pRow->status; + int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen; + if (taosWriteFile(fd, pRaw, writeLen) != writeLen) { + code = TAOS_SYSTEM_ERROR(terrno); + taosHashCancelIterate(hash, ppRow); + break; + } + + int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen); + if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) { + code = TAOS_SYSTEM_ERROR(terrno); + taosHashCancelIterate(hash, ppRow); + break; + } + } else { + code = TSDB_CODE_SDB_APP_ERROR; + taosHashCancelIterate(hash, ppRow); + break; + } + + ppRow = taosHashIterate(hash, ppRow); + } + taosWUnLockLatch(pLock); + } + + if (code == 0) { + code = taosFsyncFile(fd); + } + + taosCloseFile(fd); + + if (code == 0) { + char curfile[PATH_MAX] = {0}; + snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir); + code = taosRenameFile(tmpfile, curfile); + } + + if (code != 0) { + terrno = code; + mError("failed to write sdb file since %s", terrstr()); + } else { + mDebug("write sdb file successfully"); + } + + return code; +} + +int32_t sdbOpen() { + mDebug("start to read mnode file"); + + if (sdbReadDataFile() != 0) { + return -1; + } + + return 0; +} + +void sdbClose() { + if (tsSdb.curVer != tsSdb.lastCommitVer) { + mDebug("start to write mnode file"); + sdbWriteDataFile(); + } + + for (int32_t i = 0; i < SDB_MAX; ++i) { + SHashObj *hash = tsSdb.hashObjs[i]; + if (hash != NULL) { + taosHashClear(hash); + } + } +} + +int32_t sdbDeploy() { + if (sdbCreateDir() != 0) { + return -1; + } + + if (sdbRunDeployFp() != 0) { + return -1; + } + + if (sdbWriteDataFile() != 0) { + return -1; + } + + sdbClose(); + return 0; +} + +void sdbUnDeploy() { + mDebug("start to undeploy mnode"); + taosRemoveDir(tsMnodeDir); +} diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c new file mode 100644 index 0000000000..9c19e7f1a2 --- /dev/null +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -0,0 +1,280 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "sdbInt.h" +#include "tglobal.h" + +static SHashObj *sdbGetHash(int32_t sdb) { + if (sdb >= SDB_MAX || sdb <= SDB_START) { + terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE; + return NULL; + } + + SHashObj *hash = tsSdb.hashObjs[sdb]; + if (hash == NULL) { + terrno = TSDB_CODE_SDB_APP_ERROR; + return NULL; + } + + return hash; +} + +static int32_t sdbGetkeySize(ESdbType sdb, void *pKey) { + int32_t keySize; + EKeyType keyType = tsSdb.keyTypes[sdb]; + + if (keyType == SDB_KEY_INT32) { + keySize = sizeof(int32_t); + } else if (keyType == SDB_KEY_BINARY) { + keySize = strlen(pKey) + 1; + } else { + keySize = sizeof(int64_t); + } + + return keySize; +} + +static int32_t sdbInsertRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { + SRWLatch *pLock = &tsSdb.locks[pRow->sdb]; + taosWLockLatch(pLock); + + SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize); + if (pDstRow != NULL) { + terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; + taosWUnLockLatch(pLock); + sdbFreeRow(pRow); + return -1; + } + + pRow->refCount = 1; + pRow->status = pRaw->status; + + if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosWUnLockLatch(pLock); + sdbFreeRow(pRow); + return -1; + } + + taosWUnLockLatch(pLock); + + SdbInsertFp insertFp = tsSdb.insertFps[pRow->sdb]; + if (insertFp != NULL) { + if ((*insertFp)(pRow->pObj) != 0) { + taosWLockLatch(pLock); + taosHashRemove(hash, pRow->pObj, keySize); + taosWUnLockLatch(pLock); + sdbFreeRow(pRow); + return -1; + } + } + + return 0; +} + +static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { + SRWLatch *pLock = &tsSdb.locks[pRow->sdb]; + taosRLockLatch(pLock); + + SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize); + if (ppDstRow == NULL || *ppDstRow == NULL) { + taosRUnLockLatch(pLock); + return sdbInsertRow(hash, pRaw, pRow, keySize); + } + SSdbRow *pDstRow = *ppDstRow; + + pRow->status = pRaw->status; + taosRUnLockLatch(pLock); + + SdbUpdateFp updateFp = tsSdb.updateFps[pRow->sdb]; + if (updateFp != NULL) { + (*updateFp)(pRow->pObj, pDstRow->pObj); + } + + sdbFreeRow(pRow); + return 0; +} + +static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { + SRWLatch *pLock = &tsSdb.locks[pRow->sdb]; + taosWLockLatch(pLock); + + SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize); + if (ppDstRow == NULL || *ppDstRow == NULL) { + terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; + taosWUnLockLatch(pLock); + sdbFreeRow(pRow); + return -1; + } + SSdbRow *pDstRow = *ppDstRow; + + pDstRow->status = pRaw->status; + taosHashRemove(hash, pDstRow->pObj, keySize); + taosWUnLockLatch(pLock); + + SdbDeleteFp deleteFp = tsSdb.deleteFps[pDstRow->sdb]; + if (deleteFp != NULL) { + (void)(*deleteFp)(pDstRow->pObj); + } + + sdbRelease(pDstRow->pObj); + sdbFreeRow(pRow); + return 0; +} + +int32_t sdbWriteImp(SSdbRaw *pRaw) { + SHashObj *hash = sdbGetHash(pRaw->sdb); + if (hash == NULL) return -1; + + SdbDecodeFp decodeFp = tsSdb.decodeFps[pRaw->sdb]; + SSdbRow *pRow = (*decodeFp)(pRaw); + if (pRow == NULL) { + terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT; + return -1; + } + + pRow->sdb = pRaw->sdb; + + int32_t keySize = sdbGetkeySize(pRow->sdb, pRow->pObj); + int32_t code = -1; + + switch (pRaw->status) { + case SDB_STATUS_CREATING: + code = sdbInsertRow(hash, pRaw, pRow, keySize); + break; + case SDB_STATUS_READY: + case SDB_STATUS_DROPPING: + code = sdbUpdateRow(hash, pRaw, pRow, keySize); + break; + case SDB_STATUS_DROPPED: + code = sdbDeleteRow(hash, pRaw, pRow, keySize); + break; + default: + terrno = TSDB_CODE_SDB_INVALID_ACTION_TYPE; + break; + } + + return code; +} + +int32_t sdbWrite(SSdbRaw *pRaw) { + int32_t code = sdbWriteImp(pRaw); + sdbFreeRaw(pRaw); + return code; +} + +void *sdbAcquire(ESdbType sdb, void *pKey) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) return NULL; + + void *pRet = NULL; + int32_t keySize = sdbGetkeySize(sdb, pKey); + + SRWLatch *pLock = &tsSdb.locks[sdb]; + taosRLockLatch(pLock); + + SSdbRow **ppRow = taosHashGet(hash, pKey, keySize); + if (ppRow == NULL || *ppRow == NULL) { + terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; + taosRUnLockLatch(pLock); + return NULL; + } + + SSdbRow *pRow = *ppRow; + switch (pRow->status) { + case SDB_STATUS_READY: + atomic_add_fetch_32(&pRow->refCount, 1); + pRet = pRow->pObj; + break; + case SDB_STATUS_CREATING: + terrno = TSDB_CODE_SDB_OBJ_CREATING; + break; + case SDB_STATUS_DROPPING: + terrno = TSDB_CODE_SDB_OBJ_DROPPING; + break; + default: + terrno = TSDB_CODE_SDB_APP_ERROR; + break; + } + + taosRUnLockLatch(pLock); + return pRet; +} + +void sdbRelease(void *pObj) { + if (pObj == NULL) return; + + SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); + if (pRow->sdb >= SDB_MAX || pRow->sdb <= SDB_START) return; + + SRWLatch *pLock = &tsSdb.locks[pRow->sdb]; + taosRLockLatch(pLock); + + int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); + if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { + sdbFreeRow(pRow); + } + + taosRUnLockLatch(pLock); +} + +void *sdbFetch(ESdbType sdb, void *pIter, void **ppObj) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) return NULL; + + SRWLatch *pLock = &tsSdb.locks[sdb]; + taosRLockLatch(pLock); + + SSdbRow **ppRow = taosHashIterate(hash, ppRow); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow == NULL || pRow->status != SDB_STATUS_READY) { + ppRow = taosHashIterate(hash, ppRow); + continue; + } + + atomic_add_fetch_32(&pRow->refCount, 1); + *ppObj = pRow->pObj; + break; + } + taosRUnLockLatch(pLock); + + return ppRow; +} + +void sdbCancelFetch(void *pIter) { + if (pIter == NULL) return; + SSdbRow *pRow = *(SSdbRow **)pIter; + SHashObj *hash = sdbGetHash(pRow->sdb); + if (hash == NULL) return; + + SRWLatch *pLock = &tsSdb.locks[pRow->sdb]; + taosRLockLatch(pLock); + taosHashCancelIterate(hash, pIter); + taosRUnLockLatch(pLock); +} + +int32_t sdbGetSize(ESdbType sdb) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) return 0; + + SRWLatch *pLock = &tsSdb.locks[sdb]; + taosRLockLatch(pLock); + int32_t size = taosHashGetSize(hash); + taosRUnLockLatch(pLock); + + return size; +} diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c new file mode 100644 index 0000000000..68a0abc859 --- /dev/null +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "sdbInt.h" + +SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen) { + SSdbRaw *pRaw = calloc(1, dataLen + sizeof(SSdbRaw)); + if (pRaw == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pRaw->sdb = sdb; + pRaw->sver = sver; + pRaw->dataLen = dataLen; + return pRaw; +} + +void sdbFreeRaw(SSdbRaw *pRaw) { free(pRaw); } + +int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int8_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *(int8_t *)(pRaw->pData + dataPos) = val; + return 0; +} + +int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int32_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *(int32_t *)(pRaw->pData + dataPos) = val; + return 0; +} + +int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int64_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *(int64_t *)(pRaw->pData + dataPos) = val; + return 0; +} + +int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + valLen > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + memcpy(pRaw->pData + dataPos, pVal, valLen); + return 0; +} + +int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataLen > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + pRaw->dataLen = dataLen; + return 0; +} + +int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + pRaw->status = status; + return 0; +} + +int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int8_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *val = *(int8_t *)(pRaw->pData + dataPos); + return 0; +} + +int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int32_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *val = *(int32_t *)(pRaw->pData + dataPos); + return 0; +} + +int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int64_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *val = *(int64_t *)(pRaw->pData + dataPos); + return 0; +} + +int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + valLen > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + memcpy(pVal, pRaw->pData + dataPos, valLen); + return 0; +} + +int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + *sver = pRaw->sver; + return 0; +} + +int32_t sdbGetRawTotalSize(SSdbRaw *pRaw) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + return sizeof(SSdbRaw) + pRaw->dataLen; +} \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbRow.c b/source/dnode/mnode/sdb/src/sdbRow.c new file mode 100644 index 0000000000..68a0faa7b9 --- /dev/null +++ b/source/dnode/mnode/sdb/src/sdbRow.c @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "sdbInt.h" + +SSdbRow *sdbAllocRow(int32_t objSize) { + SSdbRow *pRow = calloc(1, objSize + sizeof(SSdbRow)); + if (pRow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + return pRow; +} + +void *sdbGetRowObj(SSdbRow *pRow) { + if (pRow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + return pRow->pObj; +} + +void sdbFreeRow(SSdbRow *pRow) { free(pRow); } diff --git a/source/dnode/mnode/transaction/inc/trnInt.h b/source/dnode/mnode/transaction/inc/trnInt.h index 03860734ee..771217dcc0 100644 --- a/source/dnode/mnode/transaction/inc/trnInt.h +++ b/source/dnode/mnode/transaction/inc/trnInt.h @@ -45,15 +45,15 @@ typedef enum { } ETrnStage; typedef struct STrans { - int32_t id; - ETrnStage stage; - ETrnPolicy policy; - void *rpcHandle; - SArray *redoLogs; - SArray *undoLogs; - SArray *commitLogs; - SArray *redoActions; - SArray *undoActions; + int32_t id; + int8_t stage; + int8_t policy; + void *rpcHandle; + SArray *redoLogs; + SArray *undoLogs; + SArray *commitLogs; + SArray *redoActions; + SArray *undoActions; } STrans; SSdbRaw *trnActionEncode(STrans *pTrans); diff --git a/source/dnode/mnode/transaction/src/trn.c b/source/dnode/mnode/transaction/src/trn.c index 1f65eb72ad..0d7c1a061e 100644 --- a/source/dnode/mnode/transaction/src/trn.c +++ b/source/dnode/mnode/transaction/src/trn.c @@ -16,8 +16,10 @@ #define _DEFAULT_SOURCE #include "trnInt.h" +#define SDB_TRANS_VER 1 + SSdbRaw *trnActionEncode(STrans *pTrans) { - int32_t rawDataLen = 5 * sizeof(int32_t); + int32_t rawDataLen = 10 * sizeof(int32_t); int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); @@ -25,91 +27,84 @@ SSdbRaw *trnActionEncode(STrans *pTrans) { int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); for (int32_t index = 0; index < redoLogNum; ++index) { - SSdbRaw *pRawData = taosArrayGet(pTrans->redoLogs, index); - rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); + SSdbRaw *pRaw = taosArrayGet(pTrans->redoLogs, index); + rawDataLen += sdbGetRawTotalSize(pRaw); } for (int32_t index = 0; index < undoLogNum; ++index) { - SSdbRaw *pRawData = taosArrayGet(pTrans->undoLogs, index); - rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); + SSdbRaw *pRaw = taosArrayGet(pTrans->undoLogs, index); + rawDataLen += sdbGetRawTotalSize(pRaw); } for (int32_t index = 0; index < commitLogNum; ++index) { - SSdbRaw *pRawData = taosArrayGet(pTrans->commitLogs, index); - rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); + SSdbRaw *pRaw = taosArrayGet(pTrans->commitLogs, index); + rawDataLen += sdbGetRawTotalSize(pRaw); } - SSdbRaw *pRaw = calloc(1, rawDataLen + sizeof(SSdbRaw)); - if (pRaw == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen); + if (pRaw == NULL) return NULL; - int32_t dataLen = 0; - char *pData = pRaw->data; - SDB_SET_INT32_VAL(pData, dataLen, redoLogNum) - SDB_SET_INT32_VAL(pData, dataLen, undoLogNum) - SDB_SET_INT32_VAL(pData, dataLen, commitLogNum) - SDB_SET_INT32_VAL(pData, dataLen, redoActionNum) - SDB_SET_INT32_VAL(pData, dataLen, undoActionNum) + int32_t dataPos = 0; + SDB_SET_INT32(pData, dataPos, pTrans->id) + SDB_SET_INT8(pData, dataPos, pTrans->stage) + SDB_SET_INT8(pData, dataPos, pTrans->policy) + SDB_SET_INT32(pData, dataPos, redoLogNum) + SDB_SET_INT32(pData, dataPos, undoLogNum) + SDB_SET_INT32(pData, dataPos, commitLogNum) + SDB_SET_INT32(pData, dataPos, redoActionNum) + SDB_SET_INT32(pData, dataPos, undoActionNum) + SDB_SET_DATALEN(pRaw, dataPos); - pRaw->dataLen = dataLen; - pRaw->type = SDB_TRANS; - pRaw->sver = TRN_VER; return pRaw; } STrans *trnActionDecode(SSdbRaw *pRaw) { - if (pRaw->sver != TRN_VER) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_TRANS_VER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } - STrans *pTrans = NULL; - if (pTrans == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + SSdbRow *pRow = sdbAllocRow(sizeof(STrans)); + STrans *pTrans = sdbGetRowObj(pRow); + if (pTrans == NULL) return NULL; int32_t redoLogNum = 0; int32_t undoLogNum = 0; int32_t commitLogNum = 0; int32_t redoActionNum = 0; int32_t undoActionNum = 0; - SSdbRaw *pTmp = malloc(sizeof(SSdbRaw)); - int32_t code = 0; - int32_t dataLen = pRaw->dataLen; - char *pData = pRaw->data; - SDB_GET_INT32_VAL(pData, dataLen, redoLogNum, code) - SDB_GET_INT32_VAL(pData, dataLen, undoLogNum, code) - SDB_GET_INT32_VAL(pData, dataLen, commitLogNum, code) - SDB_GET_INT32_VAL(pData, dataLen, redoActionNum, code) - SDB_GET_INT32_VAL(pData, dataLen, undoActionNum, code) + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id) + SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->stage) + SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->policy) + SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum) + SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum) + SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum) + SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum) + SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum) for (int32_t index = 0; index < redoLogNum; ++index) { - SDB_GET_BINARY_VAL(pData, dataLen, pTmp, sizeof(SSdbRaw), code); - if (code == 0 && pTmp->dataLen > 0) { - SSdbRaw *pRead = malloc(sizeof(SSdbRaw) + pTmp->dataLen); - if (pRead == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - break; - } - memcpy(pRead, pTmp, sizeof(SSdbRaw)); - SDB_GET_BINARY_VAL(pData, dataLen, pRead->data, pRead->dataLen, code); - void *ret = taosArrayPush(pTrans->redoLogs, &pRead); - if (ret == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - break; - } + int32_t dataLen = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) + + char *pData = malloc(dataLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->redoLogs, pData); + if (ret == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + break; } } - if (code != 0) { - trnDrop(pTrans); - terrno = code; - return NULL; - } + // if (code != 0) { + // trnDrop(pTrans); + // terrno = code; + // return NULL; + // } return pTrans; } diff --git a/source/dnode/mnode/transaction/src/trnExec.c b/source/dnode/mnode/transaction/src/trnExec.c index 6bb46f1283..fc15c16225 100644 --- a/source/dnode/mnode/transaction/src/trnExec.c +++ b/source/dnode/mnode/transaction/src/trnExec.c @@ -52,7 +52,7 @@ int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) { return 0; } - if (sdbWrite(pRaw) != 0) { + if (sdbWrite(pData) != 0) { code = terrno; trnSendRpcRsp(pData, code); terrno = code; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 2053554666..0b10274c00 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PTR, "Invalid pointer") TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, "Memory corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, "Data file corrupted") +TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid config message") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") @@ -147,13 +148,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FAILED_TO_INIT_STEP, "failed to init compon TAOS_DEFINE_ERROR(TSDB_CODE_SDB_APP_ERROR, "Unexpected generic error in sdb") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_ALREADY_THERE, "Object already there") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_NOT_THERE, "Object not there") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_CREATING, "Object is creating") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_DROPPING, "Object is dropping") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_TABLE_TYPE, "Invalid table type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_KEY_TYPE, "Invalid key type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_ACTION_TYPE, "Invalid action type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_STATUS_TYPE, "Invalid status type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len") -TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_META_ROW, "Invalid meta row") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "DNode already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, "DNode does not exist")