This commit is contained in:
Shengliang Guan 2020-11-18 15:39:57 +08:00
parent f38b218247
commit 97e41cdf8d
11 changed files with 562 additions and 559 deletions

View File

@ -20,7 +20,9 @@
extern "C" { extern "C" {
#endif #endif
struct SMnodeMsg; #include "mnode.h"
struct SSdbTable;
typedef enum { typedef enum {
SDB_TABLE_CLUSTER = 0, SDB_TABLE_CLUSTER = 0,
@ -36,32 +38,32 @@ typedef enum {
} ESdbTable; } ESdbTable;
typedef enum { typedef enum {
SDB_KEY_STRING, SDB_KEY_STRING = 0,
SDB_KEY_INT, SDB_KEY_INT = 1,
SDB_KEY_AUTO, SDB_KEY_AUTO = 2,
SDB_KEY_VAR_STRING, SDB_KEY_VAR_STRING = 3,
} ESdbKey; } ESdbKey;
typedef enum { typedef enum {
SDB_OPER_GLOBAL, SDB_OPER_GLOBAL = 0,
SDB_OPER_LOCAL SDB_OPER_LOCAL = 1
} ESdbOper; } ESdbOper;
typedef struct SSWriteMsg { typedef struct SSWriteMsg {
ESdbOper type; ESdbOper type;
int32_t rowSize;
int32_t retCode; // for callback in sdb queue
int32_t processedCount; // for sync fwd callback int32_t processedCount; // for sync fwd callback
int32_t (*reqFp)(struct SMnodeMsg *pMsg); int32_t retCode; // for callback in sdb queue
int32_t (*writeCb)(struct SMnodeMsg *pMsg, int32_t code); int32_t rowSize;
void * table;
void * pObj;
void * rowData; void * rowData;
struct SMnodeMsg *pMsg; int32_t (*fpReq)(SMnodeMsg *pMsg);
int32_t (*fpWrite)(SMnodeMsg *pMsg, int32_t code);
void * pObj;
SMnodeMsg *pMsg;
struct SSdbTable *pTable;
} SSWriteMsg; } SSWriteMsg;
typedef struct { typedef struct {
char *tableName; char * tableName;
int32_t hashSessions; int32_t hashSessions;
int32_t maxRowSize; int32_t maxRowSize;
int32_t refCountPos; int32_t refCountPos;
@ -89,15 +91,15 @@ int32_t sdbDeleteRow(SSWriteMsg *pWrite);
int32_t sdbUpdateRow(SSWriteMsg *pWrite); int32_t sdbUpdateRow(SSWriteMsg *pWrite);
int32_t sdbInsertRowImp(SSWriteMsg *pWrite); int32_t sdbInsertRowImp(SSWriteMsg *pWrite);
void *sdbGetRow(void *handle, void *key); void *sdbGetRow(void *pTable, void *key);
void *sdbFetchRow(void *handle, void *pIter, void **ppRow); void *sdbFetchRow(void *pTable, void *pIter, void **ppRow);
void sdbFreeIter(void *pIter); void sdbFreeIter(void *pIter);
void sdbIncRef(void *thandle, void *pRow); void sdbIncRef(void *pTable, void *pRow);
void sdbDecRef(void *thandle, void *pRow); void sdbDecRef(void *pTable, void *pRow);
int64_t sdbGetNumOfRows(void *handle); int64_t sdbGetNumOfRows(void *pTable);
int32_t sdbGetId(void *handle); int32_t sdbGetId(void *pTable);
uint64_t sdbGetVersion(); uint64_t sdbGetVersion();
bool sdbCheckRowDeleted(void *thandle, void *pRow); bool sdbCheckRowDeleted(void *pTable, void *pRow);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -31,30 +31,30 @@ void * tsAcctSdb = NULL;
static int32_t tsAcctUpdateSize; static int32_t tsAcctUpdateSize;
static int32_t mnodeCreateRootAcct(); static int32_t mnodeCreateRootAcct();
static int32_t mnodeAcctActionDestroy(SSWriteMsg *pOper) { static int32_t mnodeAcctActionDestroy(SSWriteMsg *pWMsg) {
SAcctObj *pAcct = pOper->pObj; SAcctObj *pAcct = pWMsg->pObj;
pthread_mutex_destroy(&pAcct->mutex); pthread_mutex_destroy(&pAcct->mutex);
tfree(pOper->pObj); tfree(pWMsg->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeAcctActionInsert(SSWriteMsg *pOper) { static int32_t mnodeAcctActionInsert(SSWriteMsg *pWMsg) {
SAcctObj *pAcct = pOper->pObj; SAcctObj *pAcct = pWMsg->pObj;
memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo)); memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo));
pAcct->acctInfo.accessState = TSDB_VN_ALL_ACCCESS; pAcct->acctInfo.accessState = TSDB_VN_ALL_ACCCESS;
pthread_mutex_init(&pAcct->mutex, NULL); pthread_mutex_init(&pAcct->mutex, NULL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeAcctActionDelete(SSWriteMsg *pOper) { static int32_t mnodeAcctActionDelete(SSWriteMsg *pWMsg) {
SAcctObj *pAcct = pOper->pObj; SAcctObj *pAcct = pWMsg->pObj;
mnodeDropAllUsers(pAcct); mnodeDropAllUsers(pAcct);
mnodeDropAllDbs(pAcct); mnodeDropAllDbs(pAcct);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeAcctActionUpdate(SSWriteMsg *pOper) { static int32_t mnodeAcctActionUpdate(SSWriteMsg *pWMsg) {
SAcctObj *pAcct = pOper->pObj; SAcctObj *pAcct = pWMsg->pObj;
SAcctObj *pSaved = mnodeGetAcct(pAcct->user); SAcctObj *pSaved = mnodeGetAcct(pAcct->user);
if (pAcct != pSaved) { if (pAcct != pSaved) {
memcpy(pSaved, pAcct, tsAcctUpdateSize); memcpy(pSaved, pAcct, tsAcctUpdateSize);
@ -64,19 +64,19 @@ static int32_t mnodeAcctActionUpdate(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeAcctActionEncode(SSWriteMsg *pOper) { static int32_t mnodeAcctActionEncode(SSWriteMsg *pWMsg) {
SAcctObj *pAcct = pOper->pObj; SAcctObj *pAcct = pWMsg->pObj;
memcpy(pOper->rowData, pAcct, tsAcctUpdateSize); memcpy(pWMsg->rowData, pAcct, tsAcctUpdateSize);
pOper->rowSize = tsAcctUpdateSize; pWMsg->rowSize = tsAcctUpdateSize;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeAcctActionDecode(SSWriteMsg *pOper) { static int32_t mnodeAcctActionDecode(SSWriteMsg *pWMsg) {
SAcctObj *pAcct = (SAcctObj *) calloc(1, sizeof(SAcctObj)); SAcctObj *pAcct = (SAcctObj *) calloc(1, sizeof(SAcctObj));
if (pAcct == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pAcct == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pAcct, pOper->rowData, tsAcctUpdateSize); memcpy(pAcct, pWMsg->rowData, tsAcctUpdateSize);
pOper->pObj = pAcct; pWMsg->pObj = pAcct;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -226,13 +226,13 @@ static int32_t mnodeCreateRootAcct() {
pAcct->acctId = sdbGetId(tsAcctSdb); pAcct->acctId = sdbGetId(tsAcctSdb);
pAcct->createdTime = taosGetTimestampMs(); pAcct->createdTime = taosGetTimestampMs();
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsAcctSdb, .pTable = tsAcctSdb,
.pObj = pAcct, .pObj = pAcct,
}; };
return sdbInsertRow(&oper); return sdbInsertRow(&wmsg);
} }
#ifndef _ACCT #ifndef _ACCT

View File

@ -32,36 +32,36 @@ static int32_t mnodeCreateCluster();
static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeClusterActionDestroy(SSWriteMsg *pOper) { static int32_t mnodeClusterActionDestroy(SSWriteMsg *pWMsg) {
tfree(pOper->pObj); tfree(pWMsg->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeClusterActionInsert(SSWriteMsg *pOper) { static int32_t mnodeClusterActionInsert(SSWriteMsg *pWMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeClusterActionDelete(SSWriteMsg *pOper) { static int32_t mnodeClusterActionDelete(SSWriteMsg *pWMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeClusterActionUpdate(SSWriteMsg *pOper) { static int32_t mnodeClusterActionUpdate(SSWriteMsg *pWMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeClusterActionEncode(SSWriteMsg *pOper) { static int32_t mnodeClusterActionEncode(SSWriteMsg *pWMsg) {
SClusterObj *pCluster = pOper->pObj; SClusterObj *pCluster = pWMsg->pObj;
memcpy(pOper->rowData, pCluster, tsClusterUpdateSize); memcpy(pWMsg->rowData, pCluster, tsClusterUpdateSize);
pOper->rowSize = tsClusterUpdateSize; pWMsg->rowSize = tsClusterUpdateSize;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeClusterActionDecode(SSWriteMsg *pOper) { static int32_t mnodeClusterActionDecode(SSWriteMsg *pWMsg) {
SClusterObj *pCluster = (SClusterObj *) calloc(1, sizeof(SClusterObj)); SClusterObj *pCluster = (SClusterObj *) calloc(1, sizeof(SClusterObj));
if (pCluster == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pCluster == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pCluster, pOper->rowData, tsClusterUpdateSize); memcpy(pCluster, pWMsg->rowData, tsClusterUpdateSize);
pOper->pObj = pCluster; pWMsg->pObj = pCluster;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -145,13 +145,13 @@ static int32_t mnodeCreateCluster() {
mDebug("uid is %s", pCluster->uid); mDebug("uid is %s", pCluster->uid);
} }
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsClusterSdb, .pTable = tsClusterSdb,
.pObj = pCluster, .pObj = pCluster,
}; };
return sdbInsertRow(&oper); return sdbInsertRow(&wmsg);
} }
const char* mnodeGetClusterId() { const char* mnodeGetClusterId() {

View File

@ -56,8 +56,8 @@ static void mnodeDestroyDb(SDbObj *pDb) {
tfree(pDb); tfree(pDb);
} }
static int32_t mnodeDbActionDestroy(SSWriteMsg *pOper) { static int32_t mnodeDbActionDestroy(SSWriteMsg *pWMsg) {
mnodeDestroyDb(pOper->pObj); mnodeDestroyDb(pWMsg->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -65,8 +65,8 @@ int64_t mnodeGetDbNum() {
return sdbGetNumOfRows(tsDbSdb); return sdbGetNumOfRows(tsDbSdb);
} }
static int32_t mnodeDbActionInsert(SSWriteMsg *pOper) { static int32_t mnodeDbActionInsert(SSWriteMsg *pWMsg) {
SDbObj *pDb = pOper->pObj; SDbObj *pDb = pWMsg->pObj;
SAcctObj *pAcct = mnodeGetAcct(pDb->acct); SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
pthread_mutex_init(&pDb->mutex, NULL); pthread_mutex_init(&pDb->mutex, NULL);
@ -91,8 +91,8 @@ static int32_t mnodeDbActionInsert(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDbActionDelete(SSWriteMsg *pOper) { static int32_t mnodeDbActionDelete(SSWriteMsg *pWMsg) {
SDbObj *pDb = pOper->pObj; SDbObj *pDb = pWMsg->pObj;
SAcctObj *pAcct = mnodeGetAcct(pDb->acct); SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
mnodeDropAllChildTables(pDb); mnodeDropAllChildTables(pDb);
@ -107,11 +107,11 @@ static int32_t mnodeDbActionDelete(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDbActionUpdate(SSWriteMsg *pOper) { static int32_t mnodeDbActionUpdate(SSWriteMsg *pWMsg) {
SDbObj *pNew = pOper->pObj; SDbObj *pNew = pWMsg->pObj;
SDbObj *pDb = mnodeGetDb(pNew->name); SDbObj *pDb = mnodeGetDb(pNew->name);
if (pDb != NULL && pNew != pDb) { if (pDb != NULL && pNew != pDb) {
memcpy(pDb, pNew, pOper->rowSize); memcpy(pDb, pNew, pWMsg->rowSize);
free(pNew->vgList); free(pNew->vgList);
free(pNew); free(pNew);
} }
@ -120,19 +120,19 @@ static int32_t mnodeDbActionUpdate(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDbActionEncode(SSWriteMsg *pOper) { static int32_t mnodeDbActionEncode(SSWriteMsg *pWMsg) {
SDbObj *pDb = pOper->pObj; SDbObj *pDb = pWMsg->pObj;
memcpy(pOper->rowData, pDb, tsDbUpdateSize); memcpy(pWMsg->rowData, pDb, tsDbUpdateSize);
pOper->rowSize = tsDbUpdateSize; pWMsg->rowSize = tsDbUpdateSize;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDbActionDecode(SSWriteMsg *pOper) { static int32_t mnodeDbActionDecode(SSWriteMsg *pWMsg) {
SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj)); SDbObj *pDb = (SDbObj *) calloc(1, sizeof(SDbObj));
if (pDb == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pDb == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pDb, pOper->rowData, tsDbUpdateSize); memcpy(pDb, pWMsg->rowData, tsDbUpdateSize);
pOper->pObj = pDb; pWMsg->pObj = pDb;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -412,16 +412,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
pMsg->pDb = pDb; pMsg->pDb = pDb;
mnodeIncDbRef(pDb); mnodeIncDbRef(pDb);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDbSdb, .pTable = tsDbSdb,
.pObj = pDb, .pObj = pDb,
.rowSize = sizeof(SDbObj), .rowSize = sizeof(SDbObj),
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeCreateDbCb .fpWrite = mnodeCreateDbCb
}; };
code = sdbInsertRow(&oper); code = sdbInsertRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code)); mError("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code));
pMsg->pDb = NULL; pMsg->pDb = NULL;
@ -440,8 +440,8 @@ bool mnodeCheckIsMonitorDB(char *db, char *monitordb) {
} }
#if 0 #if 0
void mnodePrintVgroups(SDbObj *pDb, char *oper) { void mnodePrintVgroups(SDbObj *pDb, char *wmsg) {
mInfo("db:%s, vgroup link from head, oper:%s", pDb->name, oper); mInfo("db:%s, vgroup link from head, wmsg:%s", pDb->name, wmsg);
SVgObj *pVgroup = pDb->pHead; SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) { while (pVgroup != NULL) {
mInfo("vgId:%d", pVgroup->vgId); mInfo("vgId:%d", pVgroup->vgId);
@ -807,13 +807,13 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
if (pDb->status) return TSDB_CODE_SUCCESS; if (pDb->status) return TSDB_CODE_SUCCESS;
pDb->status = true; pDb->status = true;
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDbSdb, .pTable = tsDbSdb,
.pObj = pDb .pObj = pDb
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to set dropping state, reason:%s", pDb->name, tstrerror(code)); mError("db:%s, failed to set dropping state, reason:%s", pDb->name, tstrerror(code));
} }
@ -1019,15 +1019,15 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
pDb->cfg = newCfg; pDb->cfg = newCfg;
pDb->cfgVersion++; pDb->cfgVersion++;
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDbSdb, .pTable = tsDbSdb,
.pObj = pDb, .pObj = pDb,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeAlterDbCb .fpWrite = mnodeAlterDbCb
}; };
code = sdbUpdateRow(&oper); code = sdbUpdateRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code)); mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code));
} }
@ -1071,15 +1071,15 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
mInfo("db:%s, drop db from sdb", pDb->name); mInfo("db:%s, drop db from sdb", pDb->name);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDbSdb, .pTable = tsDbSdb,
.pObj = pDb, .pObj = pDb,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeDropDbCb .fpWrite = mnodeDropDbCb
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to drop, reason:%s", pDb->name, tstrerror(code)); mError("db:%s, failed to drop, reason:%s", pDb->name, tstrerror(code));
} }
@ -1134,13 +1134,13 @@ void mnodeDropAllDbs(SAcctObj *pAcct) {
if (pDb->pAcct == pAcct) { if (pDb->pAcct == pAcct) {
mInfo("db:%s, drop db from sdb for acct:%s is dropped", pDb->name, pAcct->user); mInfo("db:%s, drop db from sdb for acct:%s is dropped", pDb->name, pAcct->user);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_LOCAL, .type = SDB_OPER_LOCAL,
.table = tsDbSdb, .pTable = tsDbSdb,
.pObj = pDb .pObj = pDb
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
numOfDbs++; numOfDbs++;
} }
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);

View File

@ -87,13 +87,13 @@ static char* offlineReason[] = {
"unknown", "unknown",
}; };
static int32_t mnodeDnodeActionDestroy(SSWriteMsg *pOper) { static int32_t mnodeDnodeActionDestroy(SSWriteMsg *pWMsg) {
tfree(pOper->pObj); tfree(pWMsg->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDnodeActionInsert(SSWriteMsg *pOper) { static int32_t mnodeDnodeActionInsert(SSWriteMsg *pWMsg) {
SDnodeObj *pDnode = pOper->pObj; SDnodeObj *pDnode = pWMsg->pObj;
if (pDnode->status != TAOS_DN_STATUS_DROPPING) { if (pDnode->status != TAOS_DN_STATUS_DROPPING) {
pDnode->status = TAOS_DN_STATUS_OFFLINE; pDnode->status = TAOS_DN_STATUS_OFFLINE;
pDnode->lastAccess = tsAccessSquence; pDnode->lastAccess = tsAccessSquence;
@ -107,8 +107,8 @@ static int32_t mnodeDnodeActionInsert(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDnodeActionDelete(SSWriteMsg *pOper) { static int32_t mnodeDnodeActionDelete(SSWriteMsg *pWMsg) {
SDnodeObj *pDnode = pOper->pObj; SDnodeObj *pDnode = pWMsg->pObj;
#ifndef _SYNC #ifndef _SYNC
mnodeDropAllDnodeVgroups(pDnode); mnodeDropAllDnodeVgroups(pDnode);
@ -121,11 +121,11 @@ static int32_t mnodeDnodeActionDelete(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pOper) { static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pWMsg) {
SDnodeObj *pNew = pOper->pObj; SDnodeObj *pNew = pWMsg->pObj;
SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId); SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId);
if (pDnode != NULL && pNew != pDnode) { if (pDnode != NULL && pNew != pDnode) {
memcpy(pDnode, pNew, pOper->rowSize); memcpy(pDnode, pNew, pWMsg->rowSize);
free(pNew); free(pNew);
} }
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
@ -134,19 +134,19 @@ static int32_t mnodeDnodeActionUpdate(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDnodeActionEncode(SSWriteMsg *pOper) { static int32_t mnodeDnodeActionEncode(SSWriteMsg *pWMsg) {
SDnodeObj *pDnode = pOper->pObj; SDnodeObj *pDnode = pWMsg->pObj;
memcpy(pOper->rowData, pDnode, tsDnodeUpdateSize); memcpy(pWMsg->rowData, pDnode, tsDnodeUpdateSize);
pOper->rowSize = tsDnodeUpdateSize; pWMsg->rowSize = tsDnodeUpdateSize;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeDnodeActionDecode(SSWriteMsg *pOper) { static int32_t mnodeDnodeActionDecode(SSWriteMsg *pWMsg) {
SDnodeObj *pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj)); SDnodeObj *pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj));
if (pDnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pDnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pDnode, pOper->rowData, tsDnodeUpdateSize); memcpy(pDnode, pWMsg->rowData, tsDnodeUpdateSize);
pOper->pObj = pDnode; pWMsg->pObj = pDnode;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -296,13 +296,13 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode) {
} }
void mnodeUpdateDnode(SDnodeObj *pDnode) { void mnodeUpdateDnode(SDnodeObj *pDnode) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDnodeSdb, .pTable = tsDnodeSdb,
.pObj = pDnode .pObj = pDnode
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnodeId:%d, failed update", pDnode->dnodeId); mError("dnodeId:%d, failed update", pDnode->dnodeId);
} }
@ -644,15 +644,15 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
tstrncpy(pDnode->dnodeEp, ep, TSDB_EP_LEN); tstrncpy(pDnode->dnodeEp, ep, TSDB_EP_LEN);
taosGetFqdnPortFromEp(ep, pDnode->dnodeFqdn, &pDnode->dnodePort); taosGetFqdnPortFromEp(ep, pDnode->dnodeFqdn, &pDnode->dnodePort);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDnodeSdb, .pTable = tsDnodeSdb,
.pObj = pDnode, .pObj = pDnode,
.rowSize = sizeof(SDnodeObj), .rowSize = sizeof(SDnodeObj),
.pMsg = pMsg .pMsg = pMsg
}; };
int32_t code = sdbInsertRow(&oper); int32_t code = sdbInsertRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
int dnodeId = pDnode->dnodeId; int dnodeId = pDnode->dnodeId;
tfree(pDnode); tfree(pDnode);
@ -665,14 +665,14 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
} }
int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) { int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDnodeSdb, .pTable = tsDnodeSdb,
.pObj = pDnode, .pObj = pDnode,
.pMsg = pMsg .pMsg = pMsg
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to drop from cluster, result:%s", pDnode->dnodeId, tstrerror(code)); mError("dnode:%d, failed to drop from cluster, result:%s", pDnode->dnodeId, tstrerror(code));
} else { } else {

View File

@ -58,13 +58,13 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo
#define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock) #define mnodeMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock)
#endif #endif
static int32_t mnodeMnodeActionDestroy(SSWriteMsg *pOper) { static int32_t mnodeMnodeActionDestroy(SSWriteMsg *pWMsg) {
tfree(pOper->pObj); tfree(pWMsg->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeMnodeActionInsert(SSWriteMsg *pOper) { static int32_t mnodeMnodeActionInsert(SSWriteMsg *pWMsg) {
SMnodeObj *pMnode = pOper->pObj; SMnodeObj *pMnode = pWMsg->pObj;
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST; if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST;
@ -76,8 +76,8 @@ static int32_t mnodeMnodeActionInsert(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeMnodeActionDelete(SSWriteMsg *pOper) { static int32_t mnodeMnodeActionDelete(SSWriteMsg *pWMsg) {
SMnodeObj *pMnode = pOper->pObj; SMnodeObj *pMnode = pWMsg->pObj;
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST; if (pDnode == NULL) return TSDB_CODE_MND_DNODE_NOT_EXIST;
@ -88,30 +88,30 @@ static int32_t mnodeMnodeActionDelete(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeMnodeActionUpdate(SSWriteMsg *pOper) { static int32_t mnodeMnodeActionUpdate(SSWriteMsg *pWMsg) {
SMnodeObj *pMnode = pOper->pObj; SMnodeObj *pMnode = pWMsg->pObj;
SMnodeObj *pSaved = mnodeGetMnode(pMnode->mnodeId); SMnodeObj *pSaved = mnodeGetMnode(pMnode->mnodeId);
if (pMnode != pSaved) { if (pMnode != pSaved) {
memcpy(pSaved, pMnode, pOper->rowSize); memcpy(pSaved, pMnode, pWMsg->rowSize);
free(pMnode); free(pMnode);
} }
mnodeDecMnodeRef(pSaved); mnodeDecMnodeRef(pSaved);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeMnodeActionEncode(SSWriteMsg *pOper) { static int32_t mnodeMnodeActionEncode(SSWriteMsg *pWMsg) {
SMnodeObj *pMnode = pOper->pObj; SMnodeObj *pMnode = pWMsg->pObj;
memcpy(pOper->rowData, pMnode, tsMnodeUpdateSize); memcpy(pWMsg->rowData, pMnode, tsMnodeUpdateSize);
pOper->rowSize = tsMnodeUpdateSize; pWMsg->rowSize = tsMnodeUpdateSize;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeMnodeActionDecode(SSWriteMsg *pOper) { static int32_t mnodeMnodeActionDecode(SSWriteMsg *pWMsg) {
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
if (pMnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pMnode == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pMnode, pOper->rowData, tsMnodeUpdateSize); memcpy(pMnode, pWMsg->rowData, tsMnodeUpdateSize);
pOper->pObj = pMnode; pWMsg->pObj = pMnode;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -329,11 +329,11 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
pMnode->mnodeId = dnodeId; pMnode->mnodeId = dnodeId;
pMnode->createdTime = taosGetTimestampMs(); pMnode->createdTime = taosGetTimestampMs();
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb, .pTable = tsMnodeSdb,
.pObj = pMnode, .pObj = pMnode,
.writeCb = mnodeCreateMnodeCb .fpWrite = mnodeCreateMnodeCb
}; };
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -346,7 +346,7 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
return; return;
} }
code = sdbInsertRow(&oper); code = sdbInsertRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code)); mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code));
tfree(pMnode); tfree(pMnode);
@ -356,8 +356,8 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
void mnodeDropMnodeLocal(int32_t dnodeId) { void mnodeDropMnodeLocal(int32_t dnodeId) {
SMnodeObj *pMnode = mnodeGetMnode(dnodeId); SMnodeObj *pMnode = mnodeGetMnode(dnodeId);
if (pMnode != NULL) { if (pMnode != NULL) {
SSWriteMsg oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode}; SSWriteMsg wmsg = {.type = SDB_OPER_LOCAL, .pTable = tsMnodeSdb, .pObj = pMnode};
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
mnodeDecMnodeRef(pMnode); mnodeDecMnodeRef(pMnode);
} }
@ -371,13 +371,13 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
return TSDB_CODE_MND_DNODE_NOT_EXIST; return TSDB_CODE_MND_DNODE_NOT_EXIST;
} }
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb, .pTable = tsMnodeSdb,
.pObj = pMnode .pObj = pMnode
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&wmsg);
sdbDecRef(tsMnodeSdb, pMnode); sdbDecRef(tsMnodeSdb, pMnode);

View File

@ -83,12 +83,12 @@ typedef struct {
typedef struct { typedef struct {
pthread_t thread; pthread_t thread;
int32_t workerId; int32_t workerId;
} SSWriteWorker; } SSdbWorker;
typedef struct { typedef struct {
int32_t num; int32_t num;
SSWriteWorker *worker; SSdbWorker *worker;
} SSWriteWorkerPool; } SSdbWorkerPool;
extern void * tsMnodeTmr; extern void * tsMnodeTmr;
static void * tsSdbTmr; static void * tsSdbTmr;
@ -96,27 +96,28 @@ static SSdbObject tsSdbObj = {0};
static taos_qset tsSdbWQset; static taos_qset tsSdbWQset;
static taos_qall tsSdbWQall; static taos_qall tsSdbWQall;
static taos_queue tsSdbWQueue; static taos_queue tsSdbWQueue;
static SSWriteWorkerPool tsSdbPool; static SSdbWorkerPool tsSdbPool;
static int32_t sdbWrite(void *wparam, void *data, int32_t type, void *pMsg); static int32_t sdbWrite(void *pWrite, void *pHead, int32_t qtype, void *unused);
static int32_t sdbWriteToQueue(void *param, void *data, int32_t type, void *pMsg); static int32_t sdbWriteToQueue(void *pWrite, void *pHead, int32_t qtype, void *unused);
static void * sdbWorkerFp(void *param); static void * sdbWorkerFp(void *pWorker);
static int32_t sdbInitWriteWorker(); static int32_t sdbInitWorker();
static void sdbCleanupWriteWorker(); static void sdbCleanupWorker();
static int32_t sdbAllocWriteQueue(); static int32_t sdbAllocQueue();
static void sdbFreeWritequeue(); static void sdbFreeQueue();
extern int32_t sdbInsertRowImp(SSWriteMsg *pWrite);
static int32_t sdbUpdateRowImp(SSWriteMsg *pWrite); static int32_t sdbUpdateRowImp(SSWriteMsg *pWrite);
static int32_t sdbDeleteRowImp(SSWriteMsg *pWrite); static int32_t sdbDeleteRowImp(SSWriteMsg *pWrite);
static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite); static int32_t sdbInsertHash(SSdbTable *pTable, SSWriteMsg *pWrite);
static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite); static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite);
static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite); static int32_t sdbDeleteHash(SSdbTable *pTable, SSWriteMsg *pWrite);
int32_t sdbGetId(void *handle) { int32_t sdbGetId(void *pTable) {
return ((SSdbTable *)handle)->autoIndex; return ((SSdbTable *)pTable)->autoIndex;
} }
int64_t sdbGetNumOfRows(void *handle) { int64_t sdbGetNumOfRows(void *pTable) {
return ((SSdbTable *)handle)->numOfRows; return ((SSdbTable *)pTable)->numOfRows;
} }
uint64_t sdbGetVersion() { uint64_t sdbGetVersion() {
@ -276,27 +277,27 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
SWalHead *pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK; SWalHead *pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK;
int32_t action = pHead->msgType % 10; int32_t action = pHead->msgType % 10;
sdbError("vgId:1, key:%p:%s hver:%" PRIu64 " action:%d, failed to foward since %s", pWrite->pObj, sdbError("vgId:1, key:%p:%s hver:%" PRIu64 " action:%d, failed to foward since %s", pWrite->pObj,
sdbGetKeyStr(pWrite->table, pHead->cont), pHead->version, action, tstrerror(pWrite->retCode)); sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version, action, tstrerror(pWrite->retCode));
if (action == SDB_ACTION_INSERT) { if (action == SDB_ACTION_INSERT) {
// It's better to create a table in two stages, create it first and then set it success // It's better to create a table in two stages, create it first and then set it success
//sdbDeleteHash(pWrite->table, pWrite); //sdbDeleteHash(pWrite->pTable, pWrite);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = pWrite->table, .pTable = pWrite->pTable,
.pObj = pWrite->pObj .pObj = pWrite->pObj
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
} }
} }
if (pWrite->writeCb != NULL) { if (pWrite->fpWrite != NULL) {
pWrite->retCode = (*pWrite->writeCb)(pMsg, pWrite->retCode); pWrite->retCode = (*pWrite->fpWrite)(pMsg, pWrite->retCode);
} }
dnodeSendRpcMWriteRsp(pMsg, pWrite->retCode); dnodeSendRpcMWriteRsp(pMsg, pWrite->retCode);
// if ahandle, means this func is called by sdb write // if ahandle, means this func is called by sdb write
if (ahandle == NULL) { if (ahandle == NULL) {
sdbDecRef(pWrite->table, pWrite->pObj); sdbDecRef(pWrite->pTable, pWrite->pObj);
} }
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
@ -403,7 +404,7 @@ void sdbUpdateSync(void *pMnodes) {
int32_t sdbInit() { int32_t sdbInit() {
pthread_mutex_init(&tsSdbObj.mutex, NULL); pthread_mutex_init(&tsSdbObj.mutex, NULL);
if (sdbInitWriteWorker() != 0) { if (sdbInitWorker() != 0) {
return -1; return -1;
} }
@ -426,7 +427,7 @@ void sdbCleanUp() {
tsSdbObj.status = SDB_STATUS_CLOSING; tsSdbObj.status = SDB_STATUS_CLOSING;
sdbCleanupWriteWorker(); sdbCleanupWorker();
sdbDebug("vgId:1, sdb will be closed, mver:%" PRIu64, tsSdbObj.version); sdbDebug("vgId:1, sdb will be closed, mver:%" PRIu64, tsSdbObj.version);
if (tsSdbObj.sync) { if (tsSdbObj.sync) {
@ -442,19 +443,19 @@ void sdbCleanUp() {
pthread_mutex_destroy(&tsSdbObj.mutex); pthread_mutex_destroy(&tsSdbObj.mutex);
} }
void sdbIncRef(void *handle, void *pObj) { void sdbIncRef(void *tparam, void *pObj) {
if (pObj == NULL || handle == NULL) return; if (pObj == NULL || tparam == NULL) return;
SSdbTable *pTable = handle; SSdbTable *pTable = tparam;
int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos);
int32_t refCount = atomic_add_fetch_32(pRefCount, 1); int32_t refCount = atomic_add_fetch_32(pRefCount, 1);
sdbTrace("vgId:1, sdb:%s, inc ref to key:%p:%s:%d", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount); sdbTrace("vgId:1, sdb:%s, inc ref to key:%p:%s:%d", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount);
} }
void sdbDecRef(void *handle, void *pObj) { void sdbDecRef(void *tparam, void *pObj) {
if (pObj == NULL || handle == NULL) return; if (pObj == NULL || tparam == NULL) return;
SSdbTable *pTable = handle; SSdbTable *pTable = tparam;
int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("vgId:1, sdb:%s, dec ref to key:%p:%s:%d", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount); sdbTrace("vgId:1, sdb:%s, dec ref to key:%p:%s:%d", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount);
@ -462,8 +463,8 @@ void sdbDecRef(void *handle, void *pObj) {
int32_t *updateEnd = pObj + pTable->refCountPos - 4; int32_t *updateEnd = pObj + pTable->refCountPos - 4;
if (refCount <= 0 && *updateEnd) { if (refCount <= 0 && *updateEnd) {
sdbTrace("vgId:1, sdb:%s, key:%p:%s:%d destroyed", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount); sdbTrace("vgId:1, sdb:%s, key:%p:%s:%d destroyed", pTable->tableName, pObj, sdbGetObjStr(pTable, pObj), refCount);
SSWriteMsg oper = {.pObj = pObj}; SSWriteMsg wmsg = {.pObj = pObj};
(*pTable->fpDestroy)(&oper); (*pTable->fpDestroy)(&wmsg);
} }
} }
@ -485,12 +486,12 @@ static void *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key)); return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key));
} }
void *sdbGetRow(void *handle, void *key) { void *sdbGetRow(void *tparam, void *key) {
SSdbTable *pTable = handle; SSdbTable *pTable = tparam;
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
void *pRow = sdbGetRowMeta(handle, key); void *pRow = sdbGetRowMeta(pTable, key);
if (pRow) sdbIncRef(handle, pRow); if (pRow) sdbIncRef(pTable, pRow);
pthread_mutex_unlock(&pTable->mutex); pthread_mutex_unlock(&pTable->mutex);
return pRow; return pRow;
@ -573,9 +574,9 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSWriteMsg *pWrite) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { static int sdbWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
SSWriteMsg *pWrite = param; SSWriteMsg *pWrite = wparam;
SWalHead *pHead = data; SWalHead *pHead = hparam;
int32_t tableId = pHead->msgType / 10; int32_t tableId = pHead->msgType / 10;
int32_t action = pHead->msgType % 10; int32_t action = pHead->msgType % 10;
@ -593,12 +594,12 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
if (pHead->version <= tsSdbObj.version) { if (pHead->version <= tsSdbObj.version) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
sdbDebug("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, sdbDebug("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbObj.version);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (pHead->version != tsSdbObj.version + 1) { } else if (pHead->version != tsSdbObj.version + 1) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
sdbError("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64, sdbError("vgId:1, sdb:%s, failed to restore %s key:%s from source(%d), hver:%" PRIu64 " too large, mver:%" PRIu64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), qtype, pHead->version, tsSdbObj.version);
return TSDB_CODE_SYN_INVALID_VERSION; return TSDB_CODE_SYN_INVALID_VERSION;
} else { } else {
tsSdbObj.version = pHead->version; tsSdbObj.version = pHead->version;
@ -613,7 +614,7 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
// from app, oper is created // from app, wmsg is created
if (pWrite != NULL) { if (pWrite != NULL) {
// forward to peers // forward to peers
pWrite->processedCount = 0; pWrite->processedCount = 0;
@ -639,11 +640,11 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
// even it is WAL/FWD, it shall be called to update version in sync // even it is WAL/FWD, it shall be called to update version in sync
syncForwardToPeer(tsSdbObj.sync, pHead, pWrite, TAOS_QTYPE_RPC); syncForwardToPeer(tsSdbObj.sync, pHead, pWrite, TAOS_QTYPE_RPC);
// from wal or forward msg, oper not created, should add into hash // from wal or forward msg, wmsg not created, should add into hash
if (action == SDB_ACTION_INSERT) { if (action == SDB_ACTION_INSERT) {
SSWriteMsg oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; SSWriteMsg wmsg = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
code = (*pTable->fpDecode)(&oper); code = (*pTable->fpDecode)(&wmsg);
return sdbInsertHash(pTable, &oper); return sdbInsertHash(pTable, &wmsg);
} else if (action == SDB_ACTION_DELETE) { } else if (action == SDB_ACTION_DELETE) {
void *pRow = sdbGetRowMeta(pTable, pHead->cont); void *pRow = sdbGetRowMeta(pTable, pHead->cont);
if (pRow == NULL) { if (pRow == NULL) {
@ -651,8 +652,8 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
sdbGetKeyStr(pTable, pHead->cont)); sdbGetKeyStr(pTable, pHead->cont));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSWriteMsg oper = {.table = pTable, .pObj = pRow}; SSWriteMsg wmsg = {.pTable = pTable, .pObj = pRow};
return sdbDeleteHash(pTable, &oper); return sdbDeleteHash(pTable, &wmsg);
} else if (action == SDB_ACTION_UPDATE) { } else if (action == SDB_ACTION_UPDATE) {
void *pRow = sdbGetRowMeta(pTable, pHead->cont); void *pRow = sdbGetRowMeta(pTable, pHead->cont);
if (pRow == NULL) { if (pRow == NULL) {
@ -660,16 +661,16 @@ static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
sdbGetKeyStr(pTable, pHead->cont)); sdbGetKeyStr(pTable, pHead->cont));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSWriteMsg oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; SSWriteMsg wmsg = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
code = (*pTable->fpDecode)(&oper); code = (*pTable->fpDecode)(&wmsg);
return sdbUpdateHash(pTable, &oper); return sdbUpdateHash(pTable, &wmsg);
} else { } else {
return TSDB_CODE_MND_INVALID_MSG_TYPE; return TSDB_CODE_MND_INVALID_MSG_TYPE;
} }
} }
int32_t sdbInsertRow(SSWriteMsg *pWrite) { int32_t sdbInsertRow(SSWriteMsg *pWrite) {
SSdbTable *pTable = (SSdbTable *)pWrite->table; SSdbTable *pTable = pWrite->pTable;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
if (sdbGetRowFromObj(pTable, pWrite->pObj)) { if (sdbGetRowFromObj(pTable, pWrite->pObj)) {
@ -699,15 +700,15 @@ int32_t sdbInsertRow(SSWriteMsg *pWrite) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pWrite->reqFp) { if (pWrite->fpReq) {
return (*pWrite->reqFp)(pWrite->pMsg); return (*pWrite->fpReq)(pWrite->pMsg);
} else { } else {
return sdbInsertRowImp(pWrite); return sdbInsertRowImp(pWrite);
} }
} }
int32_t sdbInsertRowImp(SSWriteMsg *pWrite) { int32_t sdbInsertRowImp(SSWriteMsg *pWrite) {
SSdbTable *pTable = (SSdbTable *)pWrite->table; SSdbTable *pTable = pWrite->pTable;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
@ -729,14 +730,14 @@ int32_t sdbInsertRowImp(SSWriteMsg *pWrite) {
pNewOper->pMsg, pTable->tableName, pWrite->pObj, sdbGetObjStr(pTable, pWrite->pObj)); pNewOper->pMsg, pTable->tableName, pWrite->pObj, sdbGetObjStr(pTable, pWrite->pObj));
} }
sdbIncRef(pNewOper->table, pNewOper->pObj); sdbIncRef(pNewOper->pTable, pNewOper->pObj);
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
bool sdbCheckRowDeleted(void *pTableInput, void *pRow) { bool sdbCheckRowDeleted(void *tparam, void *pRow) {
SSdbTable *pTable = pTableInput; SSdbTable *pTable = tparam;
if (pTable == NULL) return false; if (pTable == NULL) return false;
int32_t *updateEnd = pRow + pTable->refCountPos - 4; int32_t *updateEnd = pRow + pTable->refCountPos - 4;
@ -744,7 +745,7 @@ bool sdbCheckRowDeleted(void *pTableInput, void *pRow) {
} }
int32_t sdbDeleteRow(SSWriteMsg *pWrite) { int32_t sdbDeleteRow(SSWriteMsg *pWrite) {
SSdbTable *pTable = (SSdbTable *)pWrite->table; SSdbTable *pTable = pWrite->pTable;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pObj); void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pObj);
@ -768,15 +769,15 @@ int32_t sdbDeleteRow(SSWriteMsg *pWrite) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pWrite->reqFp) { if (pWrite->fpReq) {
return (*pWrite->reqFp)(pWrite->pMsg); return (*pWrite->fpReq)(pWrite->pMsg);
} else { } else {
return sdbDeleteRowImp(pWrite); return sdbDeleteRowImp(pWrite);
} }
} }
int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) { int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) {
SSdbTable *pTable = (SSdbTable *)pWrite->table; SSdbTable *pTable = pWrite->pTable;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
@ -803,7 +804,7 @@ int32_t sdbDeleteRowImp(SSWriteMsg *pWrite) {
} }
int32_t sdbUpdateRow(SSWriteMsg *pWrite) { int32_t sdbUpdateRow(SSWriteMsg *pWrite) {
SSdbTable *pTable = (SSdbTable *)pWrite->table; SSdbTable *pTable = pWrite->pTable;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pObj); void *pRow = sdbGetRowMetaFromObj(pTable, pWrite->pObj);
@ -823,15 +824,15 @@ int32_t sdbUpdateRow(SSWriteMsg *pWrite) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pWrite->reqFp) { if (pWrite->fpReq) {
return (*pWrite->reqFp)(pWrite->pMsg); return (*pWrite->fpReq)(pWrite->pMsg);
} else { } else {
return sdbUpdateRowImp(pWrite); return sdbUpdateRowImp(pWrite);
} }
} }
int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) { int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) {
SSdbTable *pTable = (SSdbTable *)pWrite->table; SSdbTable *pTable = pWrite->pTable;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; int32_t size = sizeof(SSWriteMsg) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
@ -852,14 +853,14 @@ int32_t sdbUpdateRowImp(SSWriteMsg *pWrite) {
pNewOper->pMsg, pTable->tableName, pWrite->pObj, sdbGetObjStr(pTable, pWrite->pObj)); pNewOper->pMsg, pTable->tableName, pWrite->pObj, sdbGetObjStr(pTable, pWrite->pObj));
} }
sdbIncRef(pNewOper->table, pNewOper->pObj); sdbIncRef(pNewOper->pTable, pNewOper->pObj);
taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper); taosWriteQitem(tsSdbWQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { void *sdbFetchRow(void *tparam, void *pNode, void **ppRow) {
SSdbTable *pTable = (SSdbTable *)handle; SSdbTable *pTable = tparam;
*ppRow = NULL; *ppRow = NULL;
if (pTable == NULL) return NULL; if (pTable == NULL) return NULL;
@ -880,7 +881,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
} }
*ppRow = *ppMetaRow; *ppRow = *ppMetaRow;
sdbIncRef(handle, *ppMetaRow); sdbIncRef(pTable, *ppMetaRow);
return pIter; return pIter;
} }
@ -934,12 +935,12 @@ void sdbCloseTable(void *handle) {
void **ppRow = taosHashIterGet(pIter); void **ppRow = taosHashIterGet(pIter);
if (ppRow == NULL) continue; if (ppRow == NULL) continue;
SSWriteMsg oper = { SSWriteMsg wmsg = {
.pObj = *ppRow, .pObj = *ppRow,
.table = pTable, .pTable = pTable,
}; };
(*pTable->fpDestroy)(&oper); (*pTable->fpDestroy)(&wmsg);
} }
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
@ -950,44 +951,44 @@ void sdbCloseTable(void *handle) {
free(pTable); free(pTable);
} }
int32_t sdbInitWriteWorker() { int32_t sdbInitWorker() {
tsSdbPool.num = 1; tsSdbPool.num = 1;
tsSdbPool.worker = (SSWriteWorker *)calloc(sizeof(SSWriteWorker), tsSdbPool.num); tsSdbPool.worker = calloc(sizeof(SSdbWorker), tsSdbPool.num);
if (tsSdbPool.worker == NULL) return -1; if (tsSdbPool.worker == NULL) return -1;
for (int32_t i = 0; i < tsSdbPool.num; ++i) { for (int32_t i = 0; i < tsSdbPool.num; ++i) {
SSWriteWorker *pWorker = tsSdbPool.worker + i; SSdbWorker *pWorker = tsSdbPool.worker + i;
pWorker->workerId = i; pWorker->workerId = i;
} }
sdbAllocWriteQueue(); sdbAllocQueue();
mInfo("vgId:1, sdb write is opened"); mInfo("vgId:1, sdb write is opened");
return 0; return 0;
} }
void sdbCleanupWriteWorker() { void sdbCleanupWorker() {
for (int32_t i = 0; i < tsSdbPool.num; ++i) { for (int32_t i = 0; i < tsSdbPool.num; ++i) {
SSWriteWorker *pWorker = tsSdbPool.worker + i; SSdbWorker *pWorker = tsSdbPool.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(tsSdbWQset); taosQsetThreadResume(tsSdbWQset);
} }
} }
for (int32_t i = 0; i < tsSdbPool.num; ++i) { for (int32_t i = 0; i < tsSdbPool.num; ++i) {
SSWriteWorker *pWorker = tsSdbPool.worker + i; SSdbWorker *pWorker = tsSdbPool.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
} }
sdbFreeWritequeue(); sdbFreeQueue();
tfree(tsSdbPool.worker); tfree(tsSdbPool.worker);
mInfo("vgId:1, sdb write is closed"); mInfo("vgId:1, sdb write is closed");
} }
int32_t sdbAllocWriteQueue() { int32_t sdbAllocQueue() {
tsSdbWQueue = taosOpenQueue(); tsSdbWQueue = taosOpenQueue();
if (tsSdbWQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (tsSdbWQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
@ -1006,7 +1007,7 @@ int32_t sdbAllocWriteQueue() {
} }
for (int32_t i = 0; i < tsSdbPool.num; ++i) { for (int32_t i = 0; i < tsSdbPool.num; ++i) {
SSWriteWorker *pWorker = tsSdbPool.worker + i; SSdbWorker *pWorker = tsSdbPool.worker + i;
pWorker->workerId = i; pWorker->workerId = i;
pthread_attr_t thAttr; pthread_attr_t thAttr;
@ -1029,7 +1030,7 @@ int32_t sdbAllocWriteQueue() {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void sdbFreeWritequeue() { void sdbFreeQueue() {
taosCloseQueue(tsSdbWQueue); taosCloseQueue(tsSdbWQueue);
taosFreeQall(tsSdbWQall); taosFreeQall(tsSdbWQall);
taosCloseQset(tsSdbWQset); taosCloseQset(tsSdbWQset);
@ -1038,8 +1039,8 @@ void sdbFreeWritequeue() {
tsSdbWQueue = NULL; tsSdbWQueue = NULL;
} }
int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) { int32_t sdbWriteToQueue(void *wparam, void *hparam, int32_t qtype, void *unsed) {
SWalHead *pHead = data; SWalHead *pHead = hparam;
int32_t size = sizeof(SWalHead) + pHead->len; int32_t size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = taosAllocateQitem(size); SWalHead *pWal = taosAllocateQitem(size);
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
@ -1048,10 +1049,10 @@ int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) {
return 0; return 0;
} }
static void *sdbWorkerFp(void *param) { static void *sdbWorkerFp(void *pWorker) {
SWalHead *pHead; SWalHead *pHead;
SSWriteMsg *pWrite; SSWriteMsg *pWrite;
int32_t type; int32_t qtype;
int32_t numOfMsgs; int32_t numOfMsgs;
void * item; void * item;
void * unUsed; void * unUsed;
@ -1064,22 +1065,22 @@ static void *sdbWorkerFp(void *param) {
} }
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(tsSdbWQall, &type, &item); taosGetQitem(tsSdbWQall, &qtype, &item);
if (type == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
pWrite = (SSWriteMsg *)item; pWrite = (SSWriteMsg *)item;
pWrite->processedCount = 1; pWrite->processedCount = 1;
pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK; pHead = (void *)pWrite + sizeof(SSWriteMsg) + SDB_SYNC_HACK;
if (pWrite->pMsg != NULL) { if (pWrite->pMsg != NULL) {
sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s hver:%" PRIu64 ", will be processed in sdb queue", sdbDebug("vgId:1, ahandle:%p msg:%p, sdb:%s key:%p:%s hver:%" PRIu64 ", will be processed in sdb queue",
pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, ((SSdbTable *)pWrite->table)->tableName, pWrite->pObj, pWrite->pMsg->rpcMsg.ahandle, pWrite->pMsg, pWrite->pTable->tableName, pWrite->pObj,
sdbGetKeyStr(pWrite->table, pHead->cont), pHead->version); sdbGetKeyStr(pWrite->pTable, pHead->cont), pHead->version);
} }
} else { } else {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
pWrite = NULL; pWrite = NULL;
} }
int32_t code = sdbWrite(pWrite, pHead, type, NULL); int32_t code = sdbWrite(pWrite, pHead, qtype, NULL);
if (code > 0) code = 0; if (code > 0) code = 0;
if (pWrite) { if (pWrite) {
pWrite->retCode = code; pWrite->retCode = code;
@ -1093,12 +1094,12 @@ static void *sdbWorkerFp(void *param) {
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(tsSdbWQall); taosResetQitems(tsSdbWQall);
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(tsSdbWQall, &type, &item); taosGetQitem(tsSdbWQall, &qtype, &item);
if (type == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
pWrite = (SSWriteMsg *)item; pWrite = (SSWriteMsg *)item;
sdbConfirmForward(NULL, pWrite, pWrite->retCode); sdbConfirmForward(NULL, pWrite, pWrite->retCode);
} else if (type == TAOS_QTYPE_FWD) { } else if (qtype == TAOS_QTYPE_FWD) {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
syncConfirmForward(tsSdbObj.sync, pHead->version, pHead->len); syncConfirmForward(tsSdbObj.sync, pHead->version, pHead->len);
taosFreeQitem(item); taosFreeQitem(item);

View File

@ -99,13 +99,13 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) {
tfree(pTable); tfree(pTable);
} }
static int32_t mnodeChildTableActionDestroy(SSWriteMsg *pOper) { static int32_t mnodeChildTableActionDestroy(SSWriteMsg *pWMsg) {
mnodeDestroyChildTable(pOper->pObj); mnodeDestroyChildTable(pWMsg->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeChildTableActionInsert(SSWriteMsg *pOper) { static int32_t mnodeChildTableActionInsert(SSWriteMsg *pWMsg) {
SCTableObj *pTable = pOper->pObj; SCTableObj *pTable = pWMsg->pObj;
SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
@ -153,8 +153,8 @@ static int32_t mnodeChildTableActionInsert(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeChildTableActionDelete(SSWriteMsg *pOper) { static int32_t mnodeChildTableActionDelete(SSWriteMsg *pWMsg) {
SCTableObj *pTable = pOper->pObj; SCTableObj *pTable = pWMsg->pObj;
if (pTable->vgId == 0) { if (pTable->vgId == 0) {
return TSDB_CODE_MND_VGROUP_NOT_EXIST; return TSDB_CODE_MND_VGROUP_NOT_EXIST;
} }
@ -189,8 +189,8 @@ static int32_t mnodeChildTableActionDelete(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pOper) { static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pWMsg) {
SCTableObj *pNew = pOper->pObj; SCTableObj *pNew = pWMsg->pObj;
SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId); SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId);
if (pTable != pNew) { if (pTable != pNew) {
void *oldTableId = pTable->info.tableId; void *oldTableId = pTable->info.tableId;
@ -216,50 +216,50 @@ static int32_t mnodeChildTableActionUpdate(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeChildTableActionEncode(SSWriteMsg *pOper) { static int32_t mnodeChildTableActionEncode(SSWriteMsg *pWMsg) {
SCTableObj *pTable = pOper->pObj; SCTableObj *pTable = pWMsg->pObj;
assert(pTable != NULL && pOper->rowData != NULL); assert(pTable != NULL && pWMsg->rowData != NULL);
int32_t len = strlen(pTable->info.tableId); int32_t len = strlen(pTable->info.tableId);
if (len >= TSDB_TABLE_FNAME_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID; if (len >= TSDB_TABLE_FNAME_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID;
memcpy(pOper->rowData, pTable->info.tableId, len); memcpy(pWMsg->rowData, pTable->info.tableId, len);
memset(pOper->rowData + len, 0, 1); memset(pWMsg->rowData + len, 0, 1);
len++; len++;
memcpy(pOper->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize); memcpy(pWMsg->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize);
len += tsChildTableUpdateSize; len += tsChildTableUpdateSize;
if (pTable->info.type != TSDB_CHILD_TABLE) { if (pTable->info.type != TSDB_CHILD_TABLE) {
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
memcpy(pOper->rowData + len, pTable->schema, schemaSize); memcpy(pWMsg->rowData + len, pTable->schema, schemaSize);
len += schemaSize; len += schemaSize;
if (pTable->sqlLen != 0) { if (pTable->sqlLen != 0) {
memcpy(pOper->rowData + len, pTable->sql, pTable->sqlLen); memcpy(pWMsg->rowData + len, pTable->sql, pTable->sqlLen);
len += pTable->sqlLen; len += pTable->sqlLen;
} }
} }
pOper->rowSize = len; pWMsg->rowSize = len;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeChildTableActionDecode(SSWriteMsg *pOper) { static int32_t mnodeChildTableActionDecode(SSWriteMsg *pWMsg) {
assert(pOper->rowData != NULL); assert(pWMsg->rowData != NULL);
SCTableObj *pTable = calloc(1, sizeof(SCTableObj)); SCTableObj *pTable = calloc(1, sizeof(SCTableObj));
if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
int32_t len = strlen(pOper->rowData); int32_t len = strlen(pWMsg->rowData);
if (len >= TSDB_TABLE_FNAME_LEN) { if (len >= TSDB_TABLE_FNAME_LEN) {
free(pTable); free(pTable);
return TSDB_CODE_MND_INVALID_TABLE_ID; return TSDB_CODE_MND_INVALID_TABLE_ID;
} }
pTable->info.tableId = strdup(pOper->rowData); pTable->info.tableId = strdup(pWMsg->rowData);
len++; len++;
memcpy((char*)pTable + sizeof(char *), pOper->rowData + len, tsChildTableUpdateSize); memcpy((char*)pTable + sizeof(char *), pWMsg->rowData + len, tsChildTableUpdateSize);
len += tsChildTableUpdateSize; len += tsChildTableUpdateSize;
if (pTable->info.type != TSDB_CHILD_TABLE) { if (pTable->info.type != TSDB_CHILD_TABLE) {
@ -269,7 +269,7 @@ static int32_t mnodeChildTableActionDecode(SSWriteMsg *pOper) {
mnodeDestroyChildTable(pTable); mnodeDestroyChildTable(pTable);
return TSDB_CODE_MND_INVALID_TABLE_TYPE; return TSDB_CODE_MND_INVALID_TABLE_TYPE;
} }
memcpy(pTable->schema, pOper->rowData + len, schemaSize); memcpy(pTable->schema, pWMsg->rowData + len, schemaSize);
len += schemaSize; len += schemaSize;
if (pTable->sqlLen != 0) { if (pTable->sqlLen != 0) {
@ -278,11 +278,11 @@ static int32_t mnodeChildTableActionDecode(SSWriteMsg *pOper) {
mnodeDestroyChildTable(pTable); mnodeDestroyChildTable(pTable);
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
memcpy(pTable->sql, pOper->rowData + len, pTable->sqlLen); memcpy(pTable->sql, pWMsg->rowData + len, pTable->sqlLen);
} }
} }
pOper->pObj = pTable; pWMsg->pObj = pTable;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -297,7 +297,7 @@ static int32_t mnodeChildTableActionRestored() {
SDbObj *pDb = mnodeGetDbByTableId(pTable->info.tableId); SDbObj *pDb = mnodeGetDbByTableId(pTable->info.tableId);
if (pDb == NULL || pDb->status != TSDB_DB_STATUS_READY) { if (pDb == NULL || pDb->status != TSDB_DB_STATUS_READY) {
mError("ctable:%s, failed to get db or db in dropping, discard it", pTable->info.tableId); mError("ctable:%s, failed to get db or db in dropping, discard it", pTable->info.tableId);
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
@ -309,7 +309,7 @@ static int32_t mnodeChildTableActionRestored() {
if (pVgroup == NULL) { if (pVgroup == NULL) {
mError("ctable:%s, failed to get vgId:%d tid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->tid); mError("ctable:%s, failed to get vgId:%d tid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->tid);
pTable->vgId = 0; pTable->vgId = 0;
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
@ -320,7 +320,7 @@ static int32_t mnodeChildTableActionRestored() {
mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it", mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it",
pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->tid); pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->tid);
pTable->vgId = 0; pTable->vgId = 0;
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
@ -331,7 +331,7 @@ static int32_t mnodeChildTableActionRestored() {
if (pSuperTable == NULL) { if (pSuperTable == NULL) {
mError("ctable:%s, stable:%" PRIu64 " not exist", pTable->info.tableId, pTable->suid); mError("ctable:%s, stable:%" PRIu64 " not exist", pTable->info.tableId, pTable->suid);
pTable->vgId = 0; pTable->vgId = 0;
SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; SSWriteMsg desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .pTable = tsChildTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
@ -430,13 +430,13 @@ static void mnodeDestroySuperTable(SSTableObj *pStable) {
tfree(pStable); tfree(pStable);
} }
static int32_t mnodeSuperTableActionDestroy(SSWriteMsg *pOper) { static int32_t mnodeSuperTableActionDestroy(SSWriteMsg *pWMsg) {
mnodeDestroySuperTable(pOper->pObj); mnodeDestroySuperTable(pWMsg->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pOper) { static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pWMsg) {
SSTableObj *pStable = pOper->pObj; SSTableObj *pStable = pWMsg->pObj;
SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId); SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId);
if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) { if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) {
mnodeAddSuperTableIntoDb(pDb); mnodeAddSuperTableIntoDb(pDb);
@ -446,8 +446,8 @@ static int32_t mnodeSuperTableActionInsert(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pOper) { static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pWMsg) {
SSTableObj *pStable = pOper->pObj; SSTableObj *pStable = pWMsg->pObj;
SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId); SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId);
if (pDb != NULL) { if (pDb != NULL) {
mnodeRemoveSuperTableFromDb(pDb); mnodeRemoveSuperTableFromDb(pDb);
@ -458,8 +458,8 @@ static int32_t mnodeSuperTableActionDelete(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pOper) { static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pWMsg) {
SSTableObj *pNew = pOper->pObj; SSTableObj *pNew = pWMsg->pObj;
SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId); SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId);
if (pTable != NULL && pTable != pNew) { if (pTable != NULL && pTable != pNew) {
void *oldTableId = pTable->info.tableId; void *oldTableId = pTable->info.tableId;
@ -483,43 +483,43 @@ static int32_t mnodeSuperTableActionUpdate(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeSuperTableActionEncode(SSWriteMsg *pOper) { static int32_t mnodeSuperTableActionEncode(SSWriteMsg *pWMsg) {
SSTableObj *pStable = pOper->pObj; SSTableObj *pStable = pWMsg->pObj;
assert(pOper->pObj != NULL && pOper->rowData != NULL); assert(pWMsg->pObj != NULL && pWMsg->rowData != NULL);
int32_t len = strlen(pStable->info.tableId); int32_t len = strlen(pStable->info.tableId);
if (len >= TSDB_TABLE_FNAME_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID; if (len >= TSDB_TABLE_FNAME_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID;
memcpy(pOper->rowData, pStable->info.tableId, len); memcpy(pWMsg->rowData, pStable->info.tableId, len);
memset(pOper->rowData + len, 0, 1); memset(pWMsg->rowData + len, 0, 1);
len++; len++;
memcpy(pOper->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize); memcpy(pWMsg->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize);
len += tsSuperTableUpdateSize; len += tsSuperTableUpdateSize;
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags);
memcpy(pOper->rowData + len, pStable->schema, schemaSize); memcpy(pWMsg->rowData + len, pStable->schema, schemaSize);
len += schemaSize; len += schemaSize;
pOper->rowSize = len; pWMsg->rowSize = len;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pOper) { static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pWMsg) {
assert(pOper->rowData != NULL); assert(pWMsg->rowData != NULL);
SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj)); SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj));
if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
int32_t len = strlen(pOper->rowData); int32_t len = strlen(pWMsg->rowData);
if (len >= TSDB_TABLE_FNAME_LEN){ if (len >= TSDB_TABLE_FNAME_LEN){
free(pStable); free(pStable);
return TSDB_CODE_MND_INVALID_TABLE_ID; return TSDB_CODE_MND_INVALID_TABLE_ID;
} }
pStable->info.tableId = strdup(pOper->rowData); pStable->info.tableId = strdup(pWMsg->rowData);
len++; len++;
memcpy((char*)pStable + sizeof(char *), pOper->rowData + len, tsSuperTableUpdateSize); memcpy((char*)pStable + sizeof(char *), pWMsg->rowData + len, tsSuperTableUpdateSize);
len += tsSuperTableUpdateSize; len += tsSuperTableUpdateSize;
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags);
@ -529,9 +529,9 @@ static int32_t mnodeSuperTableActionDecode(SSWriteMsg *pOper) {
return TSDB_CODE_MND_NOT_SUPER_TABLE; return TSDB_CODE_MND_NOT_SUPER_TABLE;
} }
memcpy(pStable->schema, pOper->rowData + len, schemaSize); memcpy(pStable->schema, pWMsg->rowData + len, schemaSize);
pOper->pObj = pStable; pWMsg->pObj = pStable;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -828,7 +828,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
} else { } else {
mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code)); tstrerror(code));
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsSuperTableSdb}; SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
} }
@ -878,16 +878,16 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
pMsg->pTable = (STableObj *)pStable; pMsg->pTable = (STableObj *)pStable;
mnodeIncTableRef(pMsg->pTable); mnodeIncTableRef(pMsg->pTable);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.rowSize = sizeof(SSTableObj) + schemaSize, .rowSize = sizeof(SSTableObj) + schemaSize,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeCreateSuperTableCb .fpWrite = mnodeCreateSuperTableCb
}; };
int32_t code = sdbInsertRow(&oper); int32_t code = sdbInsertRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySuperTable(pStable); mnodeDestroySuperTable(pStable);
pMsg->pTable = NULL; pMsg->pTable = NULL;
@ -937,15 +937,15 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
mnodeDropAllChildTablesInStable(pStable); mnodeDropAllChildTablesInStable(pStable);
} }
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeDropSuperTableCb .fpWrite = mnodeDropSuperTableCb
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("app:%p:%p, table:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, mError("app:%p:%p, table:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
tstrerror(code)); tstrerror(code));
@ -1010,15 +1010,15 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
mInfo("app:%p:%p, stable %s, start to add tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, mInfo("app:%p:%p, stable %s, start to add tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
schema[0].name); schema[0].name);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeAddSuperTableTagCb .fpWrite = mnodeAddSuperTableTagCb
}; };
return sdbUpdateRow(&oper); return sdbUpdateRow(&wmsg);
} }
static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
@ -1044,15 +1044,15 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
mInfo("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName); mInfo("app:%p:%p, stable %s, start to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeDropSuperTableTagCb .fpWrite = mnodeDropSuperTableTagCb
}; };
return sdbUpdateRow(&oper); return sdbUpdateRow(&wmsg);
} }
static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) {
@ -1088,15 +1088,15 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
mInfo("app:%p:%p, stable %s, start to modify tag %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, mInfo("app:%p:%p, stable %s, start to modify tag %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
oldTagName, newTagName); oldTagName, newTagName);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeModifySuperTableTagNameCb .fpWrite = mnodeModifySuperTableTagNameCb
}; };
return sdbUpdateRow(&oper); return sdbUpdateRow(&wmsg);
} }
static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName) { static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName) {
@ -1162,15 +1162,15 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
mInfo("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); mInfo("app:%p:%p, stable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeAddSuperTableColumnCb .fpWrite = mnodeAddSuperTableColumnCb
}; };
return sdbUpdateRow(&oper); return sdbUpdateRow(&wmsg);
} }
static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
@ -1207,15 +1207,15 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
mInfo("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); mInfo("app:%p:%p, stable %s, start to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeDropSuperTableColumnCb .fpWrite = mnodeDropSuperTableColumnCb
}; };
return sdbUpdateRow(&oper); return sdbUpdateRow(&wmsg);
} }
static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
@ -1251,15 +1251,15 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char
mInfo("app:%p:%p, stable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, mInfo("app:%p:%p, stable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
oldName, newName); oldName, newName);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeChangeSuperTableColumnCb .fpWrite = mnodeChangeSuperTableColumnCb
}; };
return sdbUpdateRow(&oper); return sdbUpdateRow(&wmsg);
} }
// show super tables // show super tables
@ -1417,12 +1417,12 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) {
if (pTable == NULL) break; if (pTable == NULL) break;
if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) { if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_LOCAL, .type = SDB_OPER_LOCAL,
.table = tsSuperTableSdb, .pTable = tsSuperTableSdb,
.pObj = pTable, .pObj = pTable,
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
numOfTables ++; numOfTables ++;
} }
@ -1694,7 +1694,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
} else { } else {
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg, mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->tid, pTable->uid, tstrerror(code)); pTable->info.tableId, pTable->tid, pTable->uid, tstrerror(code));
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb}; SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsChildTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
return code; return code;
} }
@ -1783,9 +1783,9 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
SSWriteMsg desc = { SSWriteMsg desc = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pObj = pTable, .pObj = pTable,
.table = tsChildTableSdb, .pTable = tsChildTableSdb,
.pMsg = pMsg, .pMsg = pMsg,
.reqFp = mnodeDoCreateChildTableFp .fpReq = mnodeDoCreateChildTableFp
}; };
int32_t code = sdbInsertRow(&desc); int32_t code = sdbInsertRow(&desc);
@ -1901,15 +1901,15 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
} }
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeDropChildTableCb .fpWrite = mnodeDropChildTableCb
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("app:%p:%p, ctable:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, mError("app:%p:%p, ctable:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code)); tstrerror(code));
@ -2005,15 +2005,15 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
mInfo("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); mInfo("app:%p:%p, ctable %s, start to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeAlterNormalTableColumnCb .fpWrite = mnodeAlterNormalTableColumnCb
}; };
return sdbUpdateRow(&oper); return sdbUpdateRow(&wmsg);
} }
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
@ -2038,15 +2038,15 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
mInfo("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName); mInfo("app:%p:%p, ctable %s, start to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeAlterNormalTableColumnCb .fpWrite = mnodeAlterNormalTableColumnCb
}; };
return sdbUpdateRow(&oper); return sdbUpdateRow(&wmsg);
} }
static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) { static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) {
@ -2075,15 +2075,15 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char
mInfo("app:%p:%p, ctable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, mInfo("app:%p:%p, ctable %s, start to modify column %s to %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
oldName, newName); oldName, newName);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.writeCb = mnodeAlterNormalTableColumnCb .fpWrite = mnodeAlterNormalTableColumnCb
}; };
return sdbUpdateRow(&oper); return sdbUpdateRow(&wmsg);
} }
static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SCTableObj *pTable) { static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SCTableObj *pTable) {
@ -2218,12 +2218,12 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
if (pTable == NULL) break; if (pTable == NULL) break;
if (pTable->vgId == pVgroup->vgId) { if (pTable->vgId == pVgroup->vgId) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_LOCAL, .type = SDB_OPER_LOCAL,
.table = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
numOfTables++; numOfTables++;
} }
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
@ -2251,12 +2251,12 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) {
if (pTable == NULL) break; if (pTable == NULL) break;
if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) { if (strncmp(prefix, pTable->info.tableId, prefixLen) == 0) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_LOCAL, .type = SDB_OPER_LOCAL,
.table = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
numOfTables++; numOfTables++;
} }
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
@ -2280,12 +2280,12 @@ static void mnodeDropAllChildTablesInStable(SSTableObj *pStable) {
if (pTable == NULL) break; if (pTable == NULL) break;
if (pTable->superTable == pStable) { if (pTable->superTable == pStable) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_LOCAL, .type = SDB_OPER_LOCAL,
.table = tsChildTableSdb, .pTable = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
numOfTables++; numOfTables++;
} }
@ -2413,9 +2413,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
SSWriteMsg desc = { SSWriteMsg desc = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pObj = pTable, .pObj = pTable,
.table = tsChildTableSdb, .pTable = tsChildTableSdb,
.pMsg = mnodeMsg, .pMsg = mnodeMsg,
.writeCb = mnodeDoCreateChildTableCb .fpWrite = mnodeDoCreateChildTableCb
}; };
int32_t code = sdbInsertRowImp(&desc); int32_t code = sdbInsertRowImp(&desc);
@ -2440,8 +2440,8 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid, mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid,
tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle, mnodeMsg->incomingTs, sec, mnodeMsg->retry); tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle, mnodeMsg->incomingTs, sec, mnodeMsg->retry);
SSWriteMsg oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable}; SSWriteMsg wmsg = {.type = SDB_OPER_GLOBAL, .pTable = tsChildTableSdb, .pObj = pTable};
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY) { if (rpcMsg->code == TSDB_CODE_APP_NOT_READY) {
//Avoid retry again in client //Avoid retry again in client

View File

@ -42,13 +42,13 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg);
static int32_t mnodeUserActionDestroy(SSWriteMsg *pOper) { static int32_t mnodeUserActionDestroy(SSWriteMsg *pWMsg) {
tfree(pOper->pObj); tfree(pWMsg->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeUserActionInsert(SSWriteMsg *pOper) { static int32_t mnodeUserActionInsert(SSWriteMsg *pWMsg) {
SUserObj *pUser = pOper->pObj; SUserObj *pUser = pWMsg->pObj;
SAcctObj *pAcct = mnodeGetAcct(pUser->acct); SAcctObj *pAcct = mnodeGetAcct(pUser->acct);
if (pAcct != NULL) { if (pAcct != NULL) {
@ -62,8 +62,8 @@ static int32_t mnodeUserActionInsert(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeUserActionDelete(SSWriteMsg *pOper) { static int32_t mnodeUserActionDelete(SSWriteMsg *pWMsg) {
SUserObj *pUser = pOper->pObj; SUserObj *pUser = pWMsg->pObj;
SAcctObj *pAcct = mnodeGetAcct(pUser->acct); SAcctObj *pAcct = mnodeGetAcct(pUser->acct);
if (pAcct != NULL) { if (pAcct != NULL) {
@ -74,8 +74,8 @@ static int32_t mnodeUserActionDelete(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeUserActionUpdate(SSWriteMsg *pOper) { static int32_t mnodeUserActionUpdate(SSWriteMsg *pWMsg) {
SUserObj *pUser = pOper->pObj; SUserObj *pUser = pWMsg->pObj;
SUserObj *pSaved = mnodeGetUser(pUser->user); SUserObj *pSaved = mnodeGetUser(pUser->user);
if (pUser != pSaved) { if (pUser != pSaved) {
memcpy(pSaved, pUser, tsUserUpdateSize); memcpy(pSaved, pUser, tsUserUpdateSize);
@ -85,19 +85,19 @@ static int32_t mnodeUserActionUpdate(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeUserActionEncode(SSWriteMsg *pOper) { static int32_t mnodeUserActionEncode(SSWriteMsg *pWMsg) {
SUserObj *pUser = pOper->pObj; SUserObj *pUser = pWMsg->pObj;
memcpy(pOper->rowData, pUser, tsUserUpdateSize); memcpy(pWMsg->rowData, pUser, tsUserUpdateSize);
pOper->rowSize = tsUserUpdateSize; pWMsg->rowSize = tsUserUpdateSize;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeUserActionDecode(SSWriteMsg *pOper) { static int32_t mnodeUserActionDecode(SSWriteMsg *pWMsg) {
SUserObj *pUser = (SUserObj *)calloc(1, sizeof(SUserObj)); SUserObj *pUser = (SUserObj *)calloc(1, sizeof(SUserObj));
if (pUser == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pUser == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pUser, pOper->rowData, tsUserUpdateSize); memcpy(pUser, pWMsg->rowData, tsUserUpdateSize);
pOper->pObj = pUser; pWMsg->pObj = pUser;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -205,14 +205,14 @@ void mnodeDecUserRef(SUserObj *pUser) {
} }
static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) { static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsUserSdb, .pTable = tsUserSdb,
.pObj = pUser, .pObj = pUser,
.pMsg = pMsg .pMsg = pMsg
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to alter by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); mError("user:%s, failed to alter by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
} else { } else {
@ -259,15 +259,15 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
pUser->superAuth = 1; pUser->superAuth = 1;
} }
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsUserSdb, .pTable = tsUserSdb,
.pObj = pUser, .pObj = pUser,
.rowSize = sizeof(SUserObj), .rowSize = sizeof(SUserObj),
.pMsg = pMsg .pMsg = pMsg
}; };
code = sdbInsertRow(&oper); code = sdbInsertRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to create by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); mError("user:%s, failed to create by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
tfree(pUser); tfree(pUser);
@ -279,14 +279,14 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
} }
static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) { static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsUserSdb, .pTable = tsUserSdb,
.pObj = pUser, .pObj = pUser,
.pMsg = pMsg .pMsg = pMsg
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to drop by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); mError("user:%s, failed to drop by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
} else { } else {
@ -562,12 +562,12 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
if (pUser == NULL) break; if (pUser == NULL) break;
if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) { if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_LOCAL, .type = SDB_OPER_LOCAL,
.table = tsUserSdb, .pTable = tsUserSdb,
.pObj = pUser, .pObj = pUser,
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
numOfUsers++; numOfUsers++;
} }

View File

@ -72,13 +72,13 @@ static void mnodeDestroyVgroup(SVgObj *pVgroup) {
tfree(pVgroup); tfree(pVgroup);
} }
static int32_t mnodeVgroupActionDestroy(SSWriteMsg *pOper) { static int32_t mnodeVgroupActionDestroy(SSWriteMsg *pWMsg) {
mnodeDestroyVgroup(pOper->pObj); mnodeDestroyVgroup(pWMsg->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeVgroupActionInsert(SSWriteMsg *pOper) { static int32_t mnodeVgroupActionInsert(SSWriteMsg *pWMsg) {
SVgObj *pVgroup = pOper->pObj; SVgObj *pVgroup = pWMsg->pObj;
// refer to db // refer to db
SDbObj *pDb = mnodeGetDb(pVgroup->dbName); SDbObj *pDb = mnodeGetDb(pVgroup->dbName);
@ -115,8 +115,8 @@ static int32_t mnodeVgroupActionInsert(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeVgroupActionDelete(SSWriteMsg *pOper) { static int32_t mnodeVgroupActionDelete(SSWriteMsg *pWMsg) {
SVgObj *pVgroup = pOper->pObj; SVgObj *pVgroup = pWMsg->pObj;
if (pVgroup->pDb == NULL) { if (pVgroup->pDb == NULL) {
mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName); mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName);
@ -137,8 +137,8 @@ static int32_t mnodeVgroupActionDelete(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pOper) { static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pWMsg) {
SVgObj *pNew = pOper->pObj; SVgObj *pNew = pWMsg->pObj;
SVgObj *pVgroup = mnodeGetVgroup(pNew->vgId); SVgObj *pVgroup = mnodeGetVgroup(pNew->vgId);
if (pVgroup != pNew) { if (pVgroup != pNew) {
@ -176,25 +176,25 @@ static int32_t mnodeVgroupActionUpdate(SSWriteMsg *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeVgroupActionEncode(SSWriteMsg *pOper) { static int32_t mnodeVgroupActionEncode(SSWriteMsg *pWMsg) {
SVgObj *pVgroup = pOper->pObj; SVgObj *pVgroup = pWMsg->pObj;
memcpy(pOper->rowData, pVgroup, tsVgUpdateSize); memcpy(pWMsg->rowData, pVgroup, tsVgUpdateSize);
SVgObj *pTmpVgroup = pOper->rowData; SVgObj *pTmpVgroup = pWMsg->rowData;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
pTmpVgroup->vnodeGid[i].pDnode = NULL; pTmpVgroup->vnodeGid[i].pDnode = NULL;
pTmpVgroup->vnodeGid[i].role = 0; pTmpVgroup->vnodeGid[i].role = 0;
} }
pOper->rowSize = tsVgUpdateSize; pWMsg->rowSize = tsVgUpdateSize;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mnodeVgroupActionDecode(SSWriteMsg *pOper) { static int32_t mnodeVgroupActionDecode(SSWriteMsg *pWMsg) {
SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj)); SVgObj *pVgroup = (SVgObj *) calloc(1, sizeof(SVgObj));
if (pVgroup == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pVgroup == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
memcpy(pVgroup, pOper->rowData, tsVgUpdateSize); memcpy(pVgroup, pWMsg->rowData, tsVgUpdateSize);
pOper->pObj = pVgroup; pWMsg->pObj = pVgroup;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -253,13 +253,13 @@ SVgObj *mnodeGetVgroup(int32_t vgId) {
} }
void mnodeUpdateVgroup(SVgObj *pVgroup) { void mnodeUpdateVgroup(SVgObj *pVgroup) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb, .pTable = tsVgroupSdb,
.pObj = pVgroup .pObj = pVgroup
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("vgId:%d, failed to update vgroup", pVgroup->vgId); mError("vgId:%d, failed to update vgroup", pVgroup->vgId);
} }
@ -519,14 +519,14 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
tstrerror(code)); tstrerror(code));
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
return code; return code;
} else { } else {
mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
pDb->name, pVgroup->numOfVnodes); pDb->name, pVgroup->numOfVnodes);
pVgroup->status = TAOS_VG_STATUS_READY; pVgroup->status = TAOS_VG_STATUS_READY;
SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
(void)sdbUpdateRow(&desc); (void)sdbUpdateRow(&desc);
dnodeReprocessMWriteMsg(pMsg); dnodeReprocessMWriteMsg(pMsg);
@ -535,7 +535,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
// mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, // mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
// pDb->name, pVgroup->numOfVnodes); // pDb->name, pVgroup->numOfVnodes);
// pVgroup->status = TAOS_VG_STATUS_READY; // pVgroup->status = TAOS_VG_STATUS_READY;
// SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; // SSWriteMsg desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
// (void)sdbUpdateRow(&desc); // (void)sdbUpdateRow(&desc);
// dnodeReprocessMWriteMsg(pMsg); // dnodeReprocessMWriteMsg(pMsg);
// return TSDB_CODE_MND_ACTION_IN_PROGRESS; // return TSDB_CODE_MND_ACTION_IN_PROGRESS;
@ -571,16 +571,16 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
pMsg->pVgroup = pVgroup; pMsg->pVgroup = pVgroup;
mnodeIncVgroupRef(pVgroup); mnodeIncVgroupRef(pVgroup);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb, .pTable = tsVgroupSdb,
.pObj = pVgroup, .pObj = pVgroup,
.rowSize = sizeof(SVgObj), .rowSize = sizeof(SVgObj),
.pMsg = pMsg, .pMsg = pMsg,
.reqFp = mnodeCreateVgroupFp .fpReq = mnodeCreateVgroupFp
}; };
code = sdbInsertRow(&oper); code = sdbInsertRow(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
pMsg->pVgroup = NULL; pMsg->pVgroup = NULL;
mnodeDestroyVgroup(pVgroup); mnodeDestroyVgroup(pVgroup);
@ -595,12 +595,12 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
} else { } else {
mDebug("vgId:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); mDebug("vgId:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
mnodeSendDropVgroupMsg(pVgroup, NULL); mnodeSendDropVgroupMsg(pVgroup, NULL);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb, .pTable = tsVgroupSdb,
.pObj = pVgroup .pObj = pVgroup
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
} }
} }
@ -957,28 +957,28 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (mnodeMsg->received != mnodeMsg->expected) return; if (mnodeMsg->received != mnodeMsg->expected) return;
if (mnodeMsg->received == mnodeMsg->successed) { if (mnodeMsg->received == mnodeMsg->successed) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb, .pTable = tsVgroupSdb,
.pObj = pVgroup, .pObj = pVgroup,
.rowSize = sizeof(SVgObj), .rowSize = sizeof(SVgObj),
.pMsg = mnodeMsg, .pMsg = mnodeMsg,
.writeCb = mnodeCreateVgroupCb .fpWrite = mnodeCreateVgroupCb
}; };
int32_t code = sdbInsertRowImp(&oper); int32_t code = sdbInsertRowImp(&wmsg);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeMsg->pVgroup = NULL; mnodeMsg->pVgroup = NULL;
mnodeDestroyVgroup(pVgroup); mnodeDestroyVgroup(pVgroup);
dnodeSendRpcMWriteRsp(mnodeMsg, code); dnodeSendRpcMWriteRsp(mnodeMsg, code);
} }
} else { } else {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb, .pTable = tsVgroupSdb,
.pObj = pVgroup .pObj = pVgroup
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
dnodeSendRpcMWriteRsp(mnodeMsg, mnodeMsg->code); dnodeSendRpcMWriteRsp(mnodeMsg, mnodeMsg->code);
} }
} }
@ -1031,12 +1031,12 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
if (mnodeMsg->received != mnodeMsg->expected) return; if (mnodeMsg->received != mnodeMsg->expected) return;
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb, .pTable = tsVgroupSdb,
.pObj = pVgroup .pObj = pVgroup
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&wmsg);
if (code != 0) { if (code != 0) {
code = TSDB_CODE_MND_SDB_ERROR; code = TSDB_CODE_MND_SDB_ERROR;
} }
@ -1084,12 +1084,12 @@ void mnodeDropAllDnodeVgroups(SDnodeObj *pDropDnode) {
if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) { if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) {
mnodeDropAllChildTablesInVgroups(pVgroup); mnodeDropAllChildTablesInVgroups(pVgroup);
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_LOCAL, .type = SDB_OPER_LOCAL,
.table = tsVgroupSdb, .pTable = tsVgroupSdb,
.pObj = pVgroup, .pObj = pVgroup,
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
numOfVgroups++; numOfVgroups++;
} }
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
@ -1135,12 +1135,12 @@ void mnodeDropAllDbVgroups(SDbObj *pDropDb) {
if (pVgroup == NULL) break; if (pVgroup == NULL) break;
if (pVgroup->pDb == pDropDb) { if (pVgroup->pDb == pDropDb) {
SSWriteMsg oper = { SSWriteMsg wmsg = {
.type = SDB_OPER_LOCAL, .type = SDB_OPER_LOCAL,
.table = tsVgroupSdb, .pTable = tsVgroupSdb,
.pObj = pVgroup, .pObj = pVgroup,
}; };
sdbDeleteRow(&oper); sdbDeleteRow(&wmsg);
numOfVgroups++; numOfVgroups++;
} }

View File

@ -113,22 +113,22 @@ echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG
echo "mDebugFlag 143" >> $TAOS_CFG echo "mDebugFlag 143" >> $TAOS_CFG
echo "sdbDebugFlag 143" >> $TAOS_CFG echo "sdbDebugFlag 143" >> $TAOS_CFG
echo "dDebugFlag 143" >> $TAOS_CFG echo "dDebugFlag 131" >> $TAOS_CFG
echo "vDebugFlag 143" >> $TAOS_CFG echo "vDebugFlag 131" >> $TAOS_CFG
echo "tsdbDebugFlag 143" >> $TAOS_CFG echo "tsdbDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 143" >> $TAOS_CFG echo "cDebugFlag 131" >> $TAOS_CFG
echo "jnidebugFlag 143" >> $TAOS_CFG echo "jnidebugFlag 131" >> $TAOS_CFG
echo "odbcdebugFlag 143" >> $TAOS_CFG echo "odbcdebugFlag 131" >> $TAOS_CFG
echo "httpDebugFlag 143" >> $TAOS_CFG echo "httpDebugFlag 131" >> $TAOS_CFG
echo "monitorDebugFlag 143" >> $TAOS_CFG echo "monitorDebugFlag 131" >> $TAOS_CFG
echo "mqttDebugFlag 143" >> $TAOS_CFG echo "mqttDebugFlag 131" >> $TAOS_CFG
echo "qdebugFlag 143" >> $TAOS_CFG echo "qdebugFlag 131" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 131" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 143" >> $TAOS_CFG echo "udebugFlag 131" >> $TAOS_CFG
echo "sdebugFlag 143" >> $TAOS_CFG echo "sdebugFlag 143" >> $TAOS_CFG
echo "wdebugFlag 143" >> $TAOS_CFG echo "wdebugFlag 143" >> $TAOS_CFG
echo "cqdebugFlag 143" >> $TAOS_CFG echo "cqdebugFlag 131" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG
@ -140,7 +140,7 @@ echo "clog 2" >> $TAOS_CFG
#echo "cache 1" >> $TAOS_CFG #echo "cache 1" >> $TAOS_CFG
echo "days 10" >> $TAOS_CFG echo "days 10" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG
echo "maxVgroupsPerDb 4" >> $TAOS_CFG echo "maxVgroupsPerDb 10" >> $TAOS_CFG
echo "minTablesPerVnode 4" >> $TAOS_CFG echo "minTablesPerVnode 4" >> $TAOS_CFG
echo "maxTablesPerVnode 1000" >> $TAOS_CFG echo "maxTablesPerVnode 1000" >> $TAOS_CFG
echo "tableIncStepPerVnode 10000" >> $TAOS_CFG echo "tableIncStepPerVnode 10000" >> $TAOS_CFG