[TD-570] the first version of sdb write queue
This commit is contained in:
parent
c0a727a7d3
commit
357384f0b1
|
@ -128,6 +128,7 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
|
|||
|
||||
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
|
||||
SMnodeMsg *pWrite = pRaw;
|
||||
if (pWrite == NULL) return;
|
||||
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
|
||||
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
|
||||
dnodeReprocessMnodeWriteMsg(pWrite);
|
||||
|
|
|
@ -113,8 +113,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, 0, 0x030C, "mnode inva
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, 0, 0x030D, "mnode invalid stream id")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, 0, 0x030E, "mnode invalid connection")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "mnode object already there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "mnode sdb error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "[sdb] object already there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "[sdb] app error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE, 0, 0x0322, "[sdb] invalid table type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_NOT_THERE, 0, 0x0323, "[sdb] object not there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_META_ROW, 0, 0x0324, "[sdb] invalid meta row")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_KEY_TYPE, 0, 0x0325, "[sdb] invalid key type")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, 0, 0x0330, "mnode dnode already exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, 0, 0x0331, "mnode dnode not exist")
|
||||
|
|
|
@ -48,8 +48,10 @@ typedef struct {
|
|||
ESdbOper type;
|
||||
void * table;
|
||||
void * pObj;
|
||||
int32_t rowSize;
|
||||
void * pMnodeMsg;
|
||||
void * rowData;
|
||||
int32_t rowSize;
|
||||
int32_t retCode; // for callback in sdb queue
|
||||
} SSdbOper;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -28,7 +28,7 @@ void * mnodeGetNextUser(void *pIter, SUserObj **pUser);
|
|||
void mnodeIncUserRef(SUserObj *pUser);
|
||||
void mnodeDecUserRef(SUserObj *pUser);
|
||||
SUserObj *mnodeGetUserFromConn(void *pConn);
|
||||
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
||||
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg);
|
||||
void mnodeDropAllUsers(SAcctObj *pAcct);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -78,7 +78,9 @@ static int32_t mnodeAcctActionDecode(SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
static int32_t mnodeAcctActionRestored() {
|
||||
if (dnodeIsFirstDeploy()) {
|
||||
int32_t numOfRows = sdbGetNumOfRows(tsAcctSdb);
|
||||
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
|
||||
mPrint("dnode first deploy, create root acct");
|
||||
int32_t code = mnodeCreateRootAcct();
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("failed to create root account, reason:%s", tstrerror(code));
|
||||
|
|
|
@ -117,6 +117,7 @@ static int32_t mnodeDnodeActionDecode(SSdbOper *pOper) {
|
|||
static int32_t mnodeDnodeActionRestored() {
|
||||
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
|
||||
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
|
||||
mPrint("dnode first deploy, create dnode:%s", tsLocalEp);
|
||||
mnodeCreateDnode(tsLocalEp);
|
||||
SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp);
|
||||
mnodeAddMnode(pDnode->dnodeId);
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "tsync.h"
|
||||
#include "tglobal.h"
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "mnodeDef.h"
|
||||
#include "mnodeInt.h"
|
||||
#include "mnodeMnode.h"
|
||||
|
@ -83,8 +84,29 @@ typedef struct {
|
|||
void * row;
|
||||
} SSdbRow;
|
||||
|
||||
typedef struct {
|
||||
pthread_t thread;
|
||||
int32_t workerId;
|
||||
} SSdbWriteWorker;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
SSdbWriteWorker *writeWorker;
|
||||
} SSdbWriteWorkerPool;
|
||||
|
||||
static SSdbObject tsSdbObj = {0};
|
||||
static int sdbWrite(void *param, void *data, int type);
|
||||
static taos_qset tsSdbWriteQset;
|
||||
static taos_qall tsSdbWriteQall;
|
||||
static taos_queue tsSdbWriteQueue;
|
||||
static SSdbWriteWorkerPool tsSdbPool;
|
||||
|
||||
static int sdbWrite(void *param, void *data, int type);
|
||||
static int sdbWriteToQueue(void *param, void *data, int type);
|
||||
static void * sdbWorkerFp(void *param);
|
||||
static int32_t sdbInitWriteWorker();
|
||||
static void sdbCleanupWriteWorker();
|
||||
static int32_t sdbAllocWriteQueue();
|
||||
static void sdbFreeWritequeue();
|
||||
|
||||
int32_t sdbGetId(void *handle) {
|
||||
return ((SSdbTable *)handle)->autoIndex;
|
||||
|
@ -302,7 +324,7 @@ void sdbUpdateSync() {
|
|||
syncInfo.ahandle = NULL;
|
||||
syncInfo.getWalInfo = sdbGetWalInfo;
|
||||
syncInfo.getFileInfo = sdbGetFileInfo;
|
||||
syncInfo.writeToCache = sdbWrite;
|
||||
syncInfo.writeToCache = sdbWriteToQueue;
|
||||
syncInfo.confirmForward = sdbConfirmForward;
|
||||
syncInfo.notifyRole = sdbNotifyRole;
|
||||
tsSdbObj.cfg = syncCfg;
|
||||
|
@ -319,6 +341,10 @@ int32_t sdbInit() {
|
|||
pthread_mutex_init(&tsSdbObj.mutex, NULL);
|
||||
sem_init(&tsSdbObj.sem, 0, 0);
|
||||
|
||||
if (sdbInitWriteWorker() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (sdbInitWal() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -340,6 +366,8 @@ void sdbCleanUp() {
|
|||
|
||||
tsSdbObj.status = SDB_STATUS_CLOSING;
|
||||
|
||||
sdbCleanupWriteWorker();
|
||||
|
||||
if (tsSdbObj.sync) {
|
||||
syncStop(tsSdbObj.sync);
|
||||
tsSdbObj.sync = NULL;
|
||||
|
@ -494,9 +522,10 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
static int sdbWrite(void *param, void *data, int type) {
|
||||
SSdbOper *pOper = param;
|
||||
SWalHead *pHead = data;
|
||||
int32_t tableId = pHead->msgType / 10;
|
||||
int32_t action = pHead->msgType % 10;
|
||||
int32_t tableId = pHead->msgType / 10;
|
||||
int32_t action = pHead->msgType % 10;
|
||||
|
||||
SSdbTable *pTable = sdbGetTableFromId(tableId);
|
||||
assert(pTable != NULL);
|
||||
|
@ -531,21 +560,22 @@ static int sdbWrite(void *param, void *data, int type) {
|
|||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||
return code;
|
||||
}
|
||||
walFsync(tsSdbObj.wal);
|
||||
|
||||
code = sdbForwardToPeer(pHead);
|
||||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||
|
||||
// from app, oper is created
|
||||
if (param != NULL) {
|
||||
//sdbTrace("request from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code));
|
||||
if (pOper != NULL) {
|
||||
sdbTrace("record from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
// from wal or forward msg, oper not created, should add into hash
|
||||
if (tsSdbObj.sync != NULL) {
|
||||
sdbTrace("forward request is received, version:%" PRIu64 " result:%s, confirm it", pHead->version, tstrerror(code));
|
||||
sdbTrace("record from wal forward is disposed, version:%" PRIu64 " confirm it", pHead->version);
|
||||
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
|
||||
} else {
|
||||
sdbTrace("record from wal restore is disposed, version:%" PRIu64 , pHead->version);
|
||||
}
|
||||
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
|
@ -568,7 +598,7 @@ static int sdbWrite(void *param, void *data, int type) {
|
|||
|
||||
int32_t sdbInsertRow(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
if (pTable == NULL) return -1;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
if (sdbGetRowFromObj(pTable, pOper->pObj)) {
|
||||
sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj));
|
||||
|
@ -587,98 +617,132 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
|
|||
pthread_mutex_unlock(&pTable->mutex);
|
||||
}
|
||||
|
||||
if (pOper->type == SDB_OPER_GLOBAL) {
|
||||
int32_t size = sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SWalHead *pHead = taosAllocateQitem(size);
|
||||
pHead->version = 0;
|
||||
pHead->len = pOper->rowSize;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;
|
||||
|
||||
pOper->rowData = pHead->cont;
|
||||
(*pTable->encodeFp)(pOper);
|
||||
pHead->len = pOper->rowSize;
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
|
||||
taosFreeQitem(pHead);
|
||||
if (code < 0) return code;
|
||||
int32_t code = sdbInsertHash(pTable, pOper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("table:%s, failed to insert into hash", pTable->tableName);
|
||||
return code;
|
||||
}
|
||||
|
||||
return sdbInsertHash(pTable, pOper);
|
||||
// just insert data into memory
|
||||
if (pOper->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper);
|
||||
pHead->version = 0;
|
||||
pHead->len = pOper->rowSize;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;
|
||||
|
||||
pOper->rowData = pHead->cont;
|
||||
(*pTable->encodeFp)(pOper);
|
||||
pHead->len = pOper->rowSize;
|
||||
|
||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t sdbDeleteRow(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
if (pTable == NULL) return -1;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
|
||||
if (pMeta == NULL) {
|
||||
sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
|
||||
return -1;
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
void * pMetaRow = pMeta->row;
|
||||
assert(pMetaRow != NULL);
|
||||
|
||||
if (pOper->type == SDB_OPER_GLOBAL) {
|
||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
||||
int32_t keySize = 0;
|
||||
switch (pTable->keyType) {
|
||||
case SDB_KEY_STRING:
|
||||
case SDB_KEY_VAR_STRING:
|
||||
keySize = strlen((char *)key) + 1;
|
||||
break;
|
||||
case SDB_KEY_INT:
|
||||
case SDB_KEY_AUTO:
|
||||
keySize = sizeof(uint32_t);
|
||||
break;
|
||||
default:
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SWalHead) + keySize;
|
||||
SWalHead *pHead = taosAllocateQitem(size);
|
||||
pHead->version = 0;
|
||||
pHead->len = keySize;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
|
||||
memcpy(pHead->cont, key, keySize);
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
|
||||
taosFreeQitem(pHead);
|
||||
if (code < 0) return code;
|
||||
void *pMetaRow = pMeta->row;
|
||||
if (pMetaRow == NULL) {
|
||||
sdbError("table:%s, record meta is null", pTable->tableName);
|
||||
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
|
||||
}
|
||||
|
||||
return sdbDeleteHash(pTable, pOper);
|
||||
int32_t code = sdbDeleteHash(pTable, pOper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("table:%s, failed to delete from hash", pTable->tableName);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just delete data from memory
|
||||
if (pOper->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
||||
int32_t keySize = 0;
|
||||
switch (pTable->keyType) {
|
||||
case SDB_KEY_STRING:
|
||||
case SDB_KEY_VAR_STRING:
|
||||
keySize = strlen((char *)key) + 1;
|
||||
break;
|
||||
case SDB_KEY_INT:
|
||||
case SDB_KEY_AUTO:
|
||||
keySize = sizeof(uint32_t);
|
||||
break;
|
||||
default:
|
||||
return TSDB_CODE_MND_SDB_INVAID_KEY_TYPE;
|
||||
}
|
||||
|
||||
|
||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize;
|
||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper);
|
||||
pHead->version = 0;
|
||||
pHead->len = keySize;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
|
||||
memcpy(pHead->cont, key, keySize);
|
||||
|
||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t sdbUpdateRow(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
if (pTable == NULL) return -1;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
|
||||
if (pMeta == NULL) {
|
||||
sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
|
||||
return -1;
|
||||
sdbTrace("table:%s, record is not there, update failed", pTable->tableName);
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
void * pMetaRow = pMeta->row;
|
||||
assert(pMetaRow != NULL);
|
||||
|
||||
if (pOper->type == SDB_OPER_GLOBAL) {
|
||||
int32_t size = sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SWalHead *pHead = taosAllocateQitem(size);
|
||||
pHead->version = 0;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
|
||||
|
||||
pOper->rowData = pHead->cont;
|
||||
(*pTable->encodeFp)(pOper);
|
||||
pHead->len = pOper->rowSize;
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
|
||||
taosFreeQitem(pHead);
|
||||
if (code < 0) return code;
|
||||
void *pMetaRow = pMeta->row;
|
||||
if (pMetaRow == NULL) {
|
||||
sdbError("table:%s, record meta is null", pTable->tableName);
|
||||
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
|
||||
}
|
||||
|
||||
return sdbUpdateHash(pTable, pOper);
|
||||
int32_t code = sdbUpdateHash(pTable, pOper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("table:%s, failed to update hash", pTable->tableName);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just update data in memory
|
||||
if (pOper->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper);
|
||||
pHead->version = 0;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
|
||||
|
||||
pOper->rowData = pHead->cont;
|
||||
(*pTable->encodeFp)(pOper);
|
||||
pHead->len = pOper->rowSize;
|
||||
|
||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
|
||||
|
@ -775,3 +839,147 @@ void sdbCloseTable(void *handle) {
|
|||
free(pTable);
|
||||
}
|
||||
|
||||
int32_t sdbInitWriteWorker() {
|
||||
tsSdbPool.num = 1;
|
||||
tsSdbPool.writeWorker = (SSdbWriteWorker *)calloc(sizeof(SSdbWriteWorker), tsSdbPool.num);
|
||||
|
||||
if (tsSdbPool.writeWorker == NULL) return -1;
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
pWorker->workerId = i;
|
||||
}
|
||||
|
||||
sdbAllocWriteQueue();
|
||||
|
||||
mPrint("sdb write is opened");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void sdbCleanupWriteWorker() {
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(tsSdbWriteQset);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
sdbFreeWritequeue();
|
||||
|
||||
mPrint("sdb write is closed");
|
||||
}
|
||||
|
||||
int32_t sdbAllocWriteQueue() {
|
||||
tsSdbWriteQueue = taosOpenQueue();
|
||||
if (tsSdbWriteQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
tsSdbWriteQset = taosOpenQset();
|
||||
if (tsSdbWriteQset == NULL) {
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
taosAddIntoQset(tsSdbWriteQset, tsSdbWriteQueue, NULL);
|
||||
|
||||
tsSdbWriteQall = taosAllocateQall();
|
||||
if (tsSdbWriteQall == NULL) {
|
||||
taosCloseQset(tsSdbWriteQset);
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
pWorker->workerId = i;
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&pWorker->thread, &thAttr, sdbWorkerFp, pWorker) != 0) {
|
||||
mError("failed to create thread to process sdb write queue, reason:%s", strerror(errno));
|
||||
taosFreeQall(tsSdbWriteQall);
|
||||
taosCloseQset(tsSdbWriteQset);
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
mTrace("sdb write worker:%d is launched, total:%d", pWorker->workerId, tsSdbPool.num);
|
||||
}
|
||||
|
||||
mTrace("sdb write queue:%p is allocated", tsSdbWriteQueue);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void sdbFreeWritequeue() {
|
||||
taosCloseQset(tsSdbWriteQueue);
|
||||
taosFreeQall(tsSdbWriteQall);
|
||||
taosCloseQset(tsSdbWriteQset);
|
||||
tsSdbWriteQall = NULL;
|
||||
tsSdbWriteQset = NULL;
|
||||
tsSdbWriteQueue = NULL;
|
||||
}
|
||||
|
||||
int sdbWriteToQueue(void *param, void *data, int type) {
|
||||
SWalHead *pHead = data;
|
||||
int size = sizeof(SWalHead) + pHead->len;
|
||||
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
|
||||
memcpy(pWal, pHead, size);
|
||||
|
||||
taosWriteQitem(tsSdbWriteQueue, type, pWal);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *sdbWorkerFp(void *param) {
|
||||
SWalHead *pHead;
|
||||
SSdbOper *pOper;
|
||||
int32_t type;
|
||||
int32_t numOfMsgs;
|
||||
void * item;
|
||||
void * unUsed;
|
||||
|
||||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed);
|
||||
if (numOfMsgs == 0) {
|
||||
sdbTrace("sdbWorkerFp: got no message from qset, exiting...");
|
||||
break;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(tsSdbWriteQall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pOper = (SSdbOper *)item;
|
||||
pHead = (void *)pOper + sizeof(SSdbOper);
|
||||
} else {
|
||||
pHead = (SWalHead *)item;
|
||||
pOper = NULL;
|
||||
}
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, type);
|
||||
if (pOper && code != TSDB_CODE_SUCCESS) {
|
||||
pOper->retCode = code;
|
||||
}
|
||||
}
|
||||
|
||||
walFsync(tsSdbObj.wal);
|
||||
|
||||
// browse all items, and process them one by one
|
||||
taosResetQitems(tsSdbWriteQall);
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(tsSdbWriteQall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pOper = (SSdbOper *)item;
|
||||
dnodeSendRpcMnodeWriteRsp(pOper->pMnodeMsg, pOper->retCode);
|
||||
}
|
||||
taosFreeQitem(item);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -102,11 +102,13 @@ static int32_t mnodeUserActionDecode(SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
static int32_t mnodeUserActionRestored() {
|
||||
if (dnodeIsFirstDeploy()) {
|
||||
int32_t numOfRows = sdbGetNumOfRows(tsUserSdb);
|
||||
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
|
||||
mPrint("dnode first deploy, create root user");
|
||||
SAcctObj *pAcct = mnodeGetAcct("root");
|
||||
mnodeCreateUser(pAcct, "root", "taosdata");
|
||||
mnodeCreateUser(pAcct, "monitor", tsInternalPass);
|
||||
mnodeCreateUser(pAcct, "_root", tsInternalPass);
|
||||
mnodeCreateUser(pAcct, "root", "taosdata", NULL);
|
||||
mnodeCreateUser(pAcct, "monitor", tsInternalPass, NULL);
|
||||
mnodeCreateUser(pAcct, "_root", tsInternalPass, NULL);
|
||||
mnodeDecAcctRef(pAcct);
|
||||
}
|
||||
|
||||
|
@ -185,7 +187,7 @@ static int32_t mnodeUpdateUser(SUserObj *pUser) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
||||
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
|
||||
int32_t code = acctCheck(pAcct, ACCT_GRANT_USER);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -226,16 +228,17 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
|||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser,
|
||||
.rowSize = sizeof(SUserObj)
|
||||
.rowSize = sizeof(SUserObj),
|
||||
.pMnodeMsg = pMsg
|
||||
};
|
||||
|
||||
code = sdbInsertRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tfree(pUser);
|
||||
code = TSDB_CODE_MND_SDB_ERROR;
|
||||
return code;
|
||||
} else {
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeDropUser(SUserObj *pUser) {
|
||||
|
@ -364,7 +367,7 @@ static int32_t mnodeProcessCreateUserMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
if (pOperUser->superAuth) {
|
||||
SCMCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
code = mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass);
|
||||
code = mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass, pMsg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("user:%s, is created by %s", pCreate->user, pOperUser->user);
|
||||
}
|
||||
|
|
|
@ -20,10 +20,10 @@ system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
|
|||
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
|
||||
system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 20
|
||||
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 20
|
||||
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 20
|
||||
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 20
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c http -v 1
|
||||
system sh/cfg.sh -n dnode2 -c http -v 1
|
||||
|
|
Loading…
Reference in New Issue