[TD-52] refact sdb code
This commit is contained in:
parent
34a3ac2436
commit
5b33531389
|
@ -33,13 +33,13 @@ typedef enum {
|
|||
} ESdbTable;
|
||||
|
||||
typedef enum {
|
||||
SDB_KEY_TYPE_STRING,
|
||||
SDB_KEY_TYPE_AUTO
|
||||
SDB_KEY_STRING,
|
||||
SDB_KEY_AUTO
|
||||
} ESdbKeyType;
|
||||
|
||||
typedef enum {
|
||||
SDB_OPER_TYPE_GLOBAL,
|
||||
SDB_OPER_TYPE_LOCAL
|
||||
SDB_OPER_GLOBAL,
|
||||
SDB_OPER_LOCAL
|
||||
} ESdbOperType;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -116,7 +116,7 @@ int32_t mgmtInitDbs() {
|
|||
.hashSessions = TSDB_MAX_DBS,
|
||||
.maxRowSize = tsDbUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_TYPE_STRING,
|
||||
.keyType = SDB_KEY_STRING,
|
||||
.insertFp = mgmtDbActionInsert,
|
||||
.deleteFp = mgmtDbActionDelete,
|
||||
.updateFp = mgmtDbActionUpdate,
|
||||
|
@ -311,7 +311,7 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
|
|||
pDb->cfg = *pCreate;
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb,
|
||||
.rowSize = sizeof(SDbObj)
|
||||
|
@ -664,7 +664,7 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) {
|
|||
|
||||
pDb->status = true;
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb,
|
||||
.rowSize = tsDbUpdateSize
|
||||
|
@ -749,7 +749,7 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
|||
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
|
||||
pDb->cfg = newCfg;
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb,
|
||||
.rowSize = tsDbUpdateSize
|
||||
|
@ -807,7 +807,7 @@ static void mgmtDropDb(SQueuedMsg *pMsg) {
|
|||
mPrint("db:%s, drop db from sdb", pDb->name);
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb
|
||||
};
|
||||
|
|
|
@ -31,8 +31,8 @@
|
|||
typedef struct {
|
||||
int32_t code;
|
||||
int64_t version;
|
||||
void * pSync;
|
||||
void * pWal;
|
||||
void * sync;
|
||||
void * wal;
|
||||
sem_t sem;
|
||||
pthread_mutex_t mutex;
|
||||
} SSdbSync;
|
||||
|
@ -99,13 +99,13 @@ static char *sdbGetActionStr(int32_t action) {
|
|||
static char *sdbGetkeyStr(SSdbTable *pTable, void *row) {
|
||||
static char str[16];
|
||||
switch (pTable->keyType) {
|
||||
case SDB_KEY_TYPE_STRING:
|
||||
case SDB_KEY_STRING:
|
||||
return (char *)row;
|
||||
case SDB_KEY_TYPE_AUTO:
|
||||
case SDB_KEY_AUTO:
|
||||
sprintf(str, "%d", *(int32_t *)row);
|
||||
return str;
|
||||
default:
|
||||
return "unknown";
|
||||
return "invalid";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -135,14 +135,14 @@ int32_t sdbInit() {
|
|||
pthread_mutex_init(&tsSdbSync->mutex, NULL);
|
||||
|
||||
SWalCfg walCfg = {.commitLog = 2, .wals = 2, .keep = 1};
|
||||
tsSdbSync->pWal = walOpen(tsMnodeDir, &walCfg);
|
||||
if (tsSdbSync->pWal == NULL) {
|
||||
tsSdbSync->wal = walOpen(tsMnodeDir, &walCfg);
|
||||
if (tsSdbSync->wal == NULL) {
|
||||
sdbError("failed to open sdb in %s", tsMnodeDir);
|
||||
return -1;
|
||||
}
|
||||
|
||||
sdbTrace("open sdb file for read");
|
||||
walRestore(tsSdbSync->pWal, tsSdbSync, sdbProcessWrite);
|
||||
walRestore(tsSdbSync->wal, tsSdbSync, sdbProcessWrite);
|
||||
|
||||
int32_t totalRows = 0;
|
||||
int32_t numOfTables = 0;
|
||||
|
@ -166,7 +166,7 @@ void sdbCleanUp() {
|
|||
if (tsSdbSync) {
|
||||
sem_destroy(&tsSdbSync->sem);
|
||||
pthread_mutex_destroy(&tsSdbSync->mutex);
|
||||
walClose(tsSdbSync->pWal);
|
||||
walClose(tsSdbSync->wal);
|
||||
tsSdbSync = NULL;
|
||||
}
|
||||
}
|
||||
|
@ -174,25 +174,28 @@ void sdbCleanUp() {
|
|||
void sdbIncRef(void *handle, void *pRow) {
|
||||
if (pRow) {
|
||||
SSdbTable *pTable = handle;
|
||||
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
|
||||
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos);
|
||||
atomic_add_fetch_32(pRefCount, 1);
|
||||
//if (0 && strcmp(pTable->tableName, "dnodes") == 0) {
|
||||
sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
|
||||
//}
|
||||
if (0 && strcmp(pTable->tableName, "dnodes") == 0) {
|
||||
sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow),
|
||||
*pRefCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void sdbDecRef(void *handle, void *pRow) {
|
||||
if (pRow) {
|
||||
SSdbTable *pTable = handle;
|
||||
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
|
||||
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
|
||||
//if (0 && strcmp(pTable->tableName, "dnodes") == 0) {
|
||||
sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
|
||||
//}
|
||||
int8_t* updateEnd = pRow + pTable->refCountPos - 1;
|
||||
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos);
|
||||
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
|
||||
if (0 && strcmp(pTable->tableName, "dnodes") == 0) {
|
||||
sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow),
|
||||
*pRefCount);
|
||||
}
|
||||
int8_t *updateEnd = pRow + pTable->refCountPos - 1;
|
||||
if (refCount <= 0 && *updateEnd) {
|
||||
sdbTrace("table:%s, record:%s:%s:%d is destroyed", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
|
||||
sdbTrace("table:%s, record:%s:%s:%d is destroyed", pTable->tableName, pTable->tableName,
|
||||
sdbGetkeyStr(pTable, pRow), *pRefCount);
|
||||
SSdbOperDesc oper = {.pObj = pRow};
|
||||
(*pTable->destroyFp)(&oper);
|
||||
}
|
||||
|
@ -228,7 +231,7 @@ void *sdbGetRow(void *handle, void *key) {
|
|||
return pMeta->row;
|
||||
}
|
||||
|
||||
static int32_t sdbInsertLocal(SSdbTable* pTable, SSdbOperDesc *pOper) {
|
||||
static int32_t sdbInsertLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
|
||||
SRowMeta rowMeta;
|
||||
rowMeta.rowSize = pOper->rowSize;
|
||||
rowMeta.row = pOper->pObj;
|
||||
|
@ -239,41 +242,144 @@ static int32_t sdbInsertLocal(SSdbTable* pTable, SSdbOperDesc *pOper) {
|
|||
pTable->numOfRows++;
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
sdbTrace("table:%s, insert record:%s, numOfRows:%d", pTable->tableName,
|
||||
sdbGetkeyStr(pTable, pOper->pObj), pTable->numOfRows);
|
||||
sdbTrace("table:%s, insert record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj),
|
||||
pTable->numOfRows);
|
||||
|
||||
(*pTable->insertFp)(pOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sdbDeleteLocal(SSdbTable* pTable, SSdbOperDesc *pOper) {
|
||||
static int32_t sdbDeleteLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj);
|
||||
pTable->numOfRows--;
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
sdbTrace("table:%s, delete record:%s, numOfRows:%d", pTable->tableName,
|
||||
sdbGetkeyStr(pTable, pOper->pObj), pTable->numOfRows);
|
||||
sdbTrace("table:%s, delete record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj),
|
||||
pTable->numOfRows);
|
||||
|
||||
(*pTable->deleteFp)(pOper);
|
||||
int8_t* updateEnd = pOper->pObj + pTable->refCountPos - 1;
|
||||
int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1;
|
||||
*updateEnd = 1;
|
||||
sdbDecRef(pTable, pOper->pObj);
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sdbUpdateLocal(SSdbTable* pTable, SSdbOperDesc *pOper) {
|
||||
sdbTrace("table:%s, update record:%s, numOfRows:%d", pTable->tableName,
|
||||
sdbGetkeyStr(pTable, pOper->pObj), pTable->numOfRows);
|
||||
static int32_t sdbUpdateLocal(SSdbTable *pTable, SSdbOperDesc *pOper) {
|
||||
sdbTrace("table:%s, update record:%s, numOfRows:%d", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj),
|
||||
pTable->numOfRows);
|
||||
|
||||
(*pTable->updateFp)(pOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_t action) {
|
||||
int32_t code = 0;
|
||||
|
||||
pthread_mutex_lock(&tsSdbSync->mutex);
|
||||
tsSdbSync->version++;
|
||||
pHead->version = tsSdbSync->version;
|
||||
|
||||
code = sdbForwardDbReqToPeer(pHead);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = walWrite(tsSdbSync->wal, pHead);
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
|
||||
if (code < 0) {
|
||||
sdbError("table:%s, failed to %s record:%s to file, version:%" PRId64 ", reason:%s", pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code));
|
||||
} else {
|
||||
sdbTrace("table:%s, success to %s record:%s to file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action),
|
||||
sdbGetkeyStr(pTable, pHead->cont), pHead->version);
|
||||
}
|
||||
|
||||
walFsync(tsSdbSync->wal);
|
||||
free(pHead);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t sdbProcessWriteFromWal(SSdbTable *pTable, SWalHead *pHead, int32_t action) {
|
||||
pthread_mutex_lock(&tsSdbSync->mutex);
|
||||
if (pHead->version <= tsSdbSync->version) {
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (pHead->version != tsSdbSync->version + 1) {
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
sdbError("table:%s, failed to restore %s record:%s from file, version:%" PRId64 " too large, sdb version:%" PRId64,
|
||||
pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version,
|
||||
tsSdbSync->version);
|
||||
return TSDB_CODE_OTHERS;
|
||||
}
|
||||
|
||||
tsSdbSync->version = pHead->version;
|
||||
sdbTrace("table:%s, success to restore %s record:%s from file, version:%" PRId64, pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
|
||||
|
||||
int32_t code = -1;
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
SSdbOperDesc oper = {
|
||||
.rowSize = pHead->len,
|
||||
.rowData = pHead->cont,
|
||||
.table = pTable,
|
||||
};
|
||||
code = (*pTable->decodeFp)(&oper);
|
||||
if (code < 0) {
|
||||
sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = sdbInsertLocal(pTable, &oper);
|
||||
} else if (action == SDB_ACTION_DELETE) {
|
||||
SRowMeta *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
|
||||
assert(rowMeta != NULL && rowMeta->row != NULL);
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.table = pTable,
|
||||
.pObj = rowMeta->row,
|
||||
};
|
||||
|
||||
code = sdbDeleteLocal(pTable, &oper);
|
||||
} else if (action == SDB_ACTION_UPDATE) {
|
||||
SRowMeta *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
|
||||
assert(rowMeta != NULL && rowMeta->row != NULL);
|
||||
|
||||
SSdbOperDesc oper1 = {
|
||||
.table = pTable,
|
||||
.pObj = rowMeta->row,
|
||||
};
|
||||
sdbDeleteLocal(pTable, &oper1);
|
||||
|
||||
SSdbOperDesc oper2 = {
|
||||
.rowSize = pHead->len,
|
||||
.rowData = pHead->cont,
|
||||
.table = pTable,
|
||||
};
|
||||
code = (*pTable->decodeFp)(&oper2);
|
||||
if (code < 0) {
|
||||
sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
return code;
|
||||
}
|
||||
code = sdbInsertLocal(pTable, &oper2);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int sdbProcessWrite(void *param, void *data, int type) {
|
||||
SWalHead *pHead = data;
|
||||
int32_t code = 0;
|
||||
int32_t tableId = pHead->msgType / 10;
|
||||
int32_t action = pHead->msgType % 10;
|
||||
|
||||
|
@ -281,108 +387,9 @@ static int sdbProcessWrite(void *param, void *data, int type) {
|
|||
assert(pTable != NULL);
|
||||
|
||||
if (pHead->version == 0) {
|
||||
// from mgmt, update version
|
||||
pthread_mutex_lock(&tsSdbSync->mutex);
|
||||
tsSdbSync->version++;
|
||||
pHead->version = tsSdbSync->version;
|
||||
|
||||
code = sdbForwardDbReqToPeer(pHead);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = walWrite(tsSdbSync->pWal, pHead);
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
|
||||
if (code < 0) {
|
||||
sdbError("table:%s, failed to %s record:%s to file, version:%" PRId64 ", reason:%s", pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, tstrerror(code));
|
||||
} else {
|
||||
sdbTrace("table:%s, success to %s record:%s to file, version:%" PRId64, pTable->tableName, sdbGetActionStr(action),
|
||||
sdbGetkeyStr(pTable, pHead->cont), pHead->version);
|
||||
}
|
||||
|
||||
walFsync(tsSdbSync->pWal);
|
||||
free(pHead);
|
||||
|
||||
return code;
|
||||
return sdbProcessWriteFromApp(pTable, pHead, action);
|
||||
} else {
|
||||
// for data from WAL or forward, version may be smaller
|
||||
pthread_mutex_lock(&tsSdbSync->mutex);
|
||||
|
||||
if (pHead->version <= tsSdbSync->version) {
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (pHead->version != tsSdbSync->version + 1) {
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
sdbError("table:%s, failed to restore %s record:%s from file, version:%" PRId64 " too large, sdb version:%" PRId64,
|
||||
pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version,
|
||||
tsSdbSync->version);
|
||||
return TSDB_CODE_OTHERS;
|
||||
} else {
|
||||
tsSdbSync->version = pHead->version;
|
||||
sdbTrace("table:%s, success to restore %s record:%s from file, version:%" PRId64, pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
|
||||
}
|
||||
|
||||
|
||||
|
||||
code = -1;
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
SSdbOperDesc oper = {
|
||||
.rowSize = pHead->len,
|
||||
.rowData = pHead->cont,
|
||||
.table = pTable,
|
||||
};
|
||||
code = (*pTable->decodeFp)(&oper);
|
||||
if (code < 0) {
|
||||
sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = sdbInsertLocal(pTable, &oper);
|
||||
} else if (action == SDB_ACTION_DELETE) {
|
||||
SRowMeta *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
|
||||
assert(rowMeta != NULL && rowMeta->row != NULL);
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.table = pTable,
|
||||
.pObj = rowMeta->row,
|
||||
};
|
||||
|
||||
code = sdbDeleteLocal(pTable, &oper);
|
||||
} else if (action == SDB_ACTION_UPDATE) {
|
||||
SRowMeta *rowMeta = sdbGetRowMeta(pTable, pHead->cont);
|
||||
assert(rowMeta != NULL && rowMeta->row != NULL);
|
||||
|
||||
SSdbOperDesc oper1 = {
|
||||
.table = pTable,
|
||||
.pObj = rowMeta->row,
|
||||
};
|
||||
sdbDeleteLocal(pTable, &oper1);
|
||||
|
||||
SSdbOperDesc oper2 = {
|
||||
.rowSize = pHead->len,
|
||||
.rowData = pHead->cont,
|
||||
.table = pTable,
|
||||
};
|
||||
code = (*pTable->decodeFp)(&oper2);
|
||||
if (code < 0) {
|
||||
sdbTrace("table:%s, failed to decode %s record:%s from file, version:%" PRId64, pTable->tableName,
|
||||
sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version);
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
return code;
|
||||
}
|
||||
code = sdbInsertLocal(pTable, &oper2);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&tsSdbSync->mutex);
|
||||
return code;
|
||||
return sdbProcessWriteFromWal(pTable, pHead, action);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -396,7 +403,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
|
|||
return TSDB_CODE_ALREADY_THERE;
|
||||
}
|
||||
|
||||
if (pTable->keyType == SDB_KEY_TYPE_AUTO) {
|
||||
if (pTable->keyType == SDB_KEY_AUTO) {
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
*((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
|
||||
|
||||
|
@ -407,7 +414,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
|
|||
pthread_mutex_unlock(&pTable->mutex);
|
||||
}
|
||||
|
||||
if (pOper->type == SDB_OPER_TYPE_GLOBAL) {
|
||||
if (pOper->type == SDB_OPER_GLOBAL) {
|
||||
int32_t size = sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SWalHead *pHead = calloc(1, size);
|
||||
pHead->version = 0;
|
||||
|
@ -439,13 +446,13 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
|
|||
void * pMetaRow = pMeta->row;
|
||||
assert(pMetaRow != NULL);
|
||||
|
||||
if (pOper->type == SDB_OPER_TYPE_GLOBAL) {
|
||||
if (pOper->type == SDB_OPER_GLOBAL) {
|
||||
int32_t rowSize = 0;
|
||||
switch (pTable->keyType) {
|
||||
case SDB_KEY_TYPE_STRING:
|
||||
case SDB_KEY_STRING:
|
||||
rowSize = strlen((char *)pOper->pObj) + 1;
|
||||
break;
|
||||
case SDB_KEY_TYPE_AUTO:
|
||||
case SDB_KEY_AUTO:
|
||||
rowSize = sizeof(uint64_t);
|
||||
break;
|
||||
default:
|
||||
|
@ -479,7 +486,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
|
|||
void * pMetaRow = pMeta->row;
|
||||
assert(pMetaRow != NULL);
|
||||
|
||||
if (pOper->type == SDB_OPER_TYPE_GLOBAL) {
|
||||
if (pOper->type == SDB_OPER_GLOBAL) {
|
||||
int32_t size = sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SWalHead *pHead = calloc(1, size);
|
||||
pHead->version = 0;
|
||||
|
|
|
@ -240,7 +240,7 @@ static int32_t mgmtChildTableActionUpdateAll() {
|
|||
if (pDb == NULL) {
|
||||
mError("ctable:%s, failed to get db, discard it", pTable->info.tableId);
|
||||
SSdbOperDesc desc = {0};
|
||||
desc.type = SDB_OPER_TYPE_LOCAL;
|
||||
desc.type = SDB_OPER_LOCAL;
|
||||
desc.pObj = pTable;
|
||||
desc.table = tsChildTableSdb;
|
||||
sdbDeleteRow(&desc);
|
||||
|
@ -254,7 +254,7 @@ static int32_t mgmtChildTableActionUpdateAll() {
|
|||
mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid);
|
||||
pTable->vgId = 0;
|
||||
SSdbOperDesc desc = {0};
|
||||
desc.type = SDB_OPER_TYPE_LOCAL;
|
||||
desc.type = SDB_OPER_LOCAL;
|
||||
desc.pObj = pTable;
|
||||
desc.table = tsChildTableSdb;
|
||||
sdbDeleteRow(&desc);
|
||||
|
@ -268,7 +268,7 @@ static int32_t mgmtChildTableActionUpdateAll() {
|
|||
pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
|
||||
pTable->vgId = 0;
|
||||
SSdbOperDesc desc = {0};
|
||||
desc.type = SDB_OPER_TYPE_LOCAL;
|
||||
desc.type = SDB_OPER_LOCAL;
|
||||
desc.pObj = pTable;
|
||||
desc.table = tsChildTableSdb;
|
||||
sdbDeleteRow(&desc);
|
||||
|
@ -280,7 +280,7 @@ static int32_t mgmtChildTableActionUpdateAll() {
|
|||
mError("ctable:%s, vgroup:%d tableList is null", pTable->info.tableId, pTable->vgId);
|
||||
pTable->vgId = 0;
|
||||
SSdbOperDesc desc = {0};
|
||||
desc.type = SDB_OPER_TYPE_LOCAL;
|
||||
desc.type = SDB_OPER_LOCAL;
|
||||
desc.pObj = pTable;
|
||||
desc.table = tsChildTableSdb;
|
||||
sdbDeleteRow(&desc);
|
||||
|
@ -294,7 +294,7 @@ static int32_t mgmtChildTableActionUpdateAll() {
|
|||
mError("ctable:%s, stable:%s not exist", pTable->info.tableId, pTable->superTableId);
|
||||
pTable->vgId = 0;
|
||||
SSdbOperDesc desc = {0};
|
||||
desc.type = SDB_OPER_TYPE_LOCAL;
|
||||
desc.type = SDB_OPER_LOCAL;
|
||||
desc.pObj = pTable;
|
||||
desc.table = tsChildTableSdb;
|
||||
sdbDeleteRow(&desc);
|
||||
|
@ -318,7 +318,7 @@ static int32_t mgmtInitChildTables() {
|
|||
.hashSessions = tsMaxTables,
|
||||
.maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_TYPE_STRING,
|
||||
.keyType = SDB_KEY_STRING,
|
||||
.insertFp = mgmtChildTableActionInsert,
|
||||
.deleteFp = mgmtChildTableActionDelete,
|
||||
.updateFp = mgmtChildTableActionUpdate,
|
||||
|
@ -433,7 +433,7 @@ static int32_t mgmtInitSuperTables() {
|
|||
.hashSessions = TSDB_MAX_SUPER_TABLES,
|
||||
.maxRowSize = tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_TYPE_STRING,
|
||||
.keyType = SDB_KEY_STRING,
|
||||
.insertFp = mgmtSuperTableActionInsert,
|
||||
.deleteFp = mgmtSuperTableActionDelete,
|
||||
.updateFp = mgmtSuperTableActionUpdate,
|
||||
|
@ -676,7 +676,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) {
|
|||
}
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
.rowSize = sizeof(SSuperTableObj) + schemaSize
|
||||
|
@ -700,7 +700,7 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
|
|||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS);
|
||||
} else {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable
|
||||
};
|
||||
|
@ -751,7 +751,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
|
|||
pStable->sversion++;
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
.rowSize = tsSuperTableUpdateSize
|
||||
|
@ -782,7 +782,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
|
|||
pStable->schema = realloc(pStable->schema, schemaSize);
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
.rowSize = tsSuperTableUpdateSize
|
||||
|
@ -817,7 +817,7 @@ static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTag
|
|||
strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN);
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
.rowSize = tsSuperTableUpdateSize
|
||||
|
@ -876,7 +876,7 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc
|
|||
}
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
.rowSize = tsSuperTableUpdateSize
|
||||
|
@ -913,7 +913,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch
|
|||
}
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
.rowSize = tsSuperTableUpdateSize
|
||||
|
@ -1061,7 +1061,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
|
|||
|
||||
if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_LOCAL,
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pTable,
|
||||
};
|
||||
|
@ -1276,7 +1276,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
|
|||
}
|
||||
|
||||
SSdbOperDesc desc = {0};
|
||||
desc.type = SDB_OPER_TYPE_GLOBAL;
|
||||
desc.type = SDB_OPER_GLOBAL;
|
||||
desc.pObj = pTable;
|
||||
desc.table = tsChildTableSdb;
|
||||
|
||||
|
@ -1430,7 +1430,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc
|
|||
}
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
.rowSize = tsChildTableUpdateSize
|
||||
|
@ -1464,7 +1464,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch
|
|||
}
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
.rowSize = tsChildTableUpdateSize
|
||||
|
@ -1606,7 +1606,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
|
|||
|
||||
if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_LOCAL,
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
};
|
||||
|
@ -1635,7 +1635,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
|
|||
|
||||
if (pTable->superTable == pStable) {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_LOCAL,
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable,
|
||||
};
|
||||
|
@ -1724,7 +1724,7 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable
|
||||
};
|
||||
|
@ -1767,7 +1767,7 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
queueMsg->thandle, tstrerror(rpcMsg->code));
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable
|
||||
};
|
||||
|
|
|
@ -104,7 +104,7 @@ int32_t mgmtInitUsers() {
|
|||
.hashSessions = TSDB_MAX_USERS,
|
||||
.maxRowSize = tsUserUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_TYPE_STRING,
|
||||
.keyType = SDB_KEY_STRING,
|
||||
.insertFp = mgmtUserActionInsert,
|
||||
.deleteFp = mgmtUserActionDelete,
|
||||
.updateFp = mgmtUserActionUpdate,
|
||||
|
@ -144,7 +144,7 @@ void mgmtReleaseUser(SUserObj *pUser) {
|
|||
|
||||
static int32_t mgmtUpdateUser(SUserObj *pUser) {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser,
|
||||
.rowSize = tsUserUpdateSize
|
||||
|
@ -192,7 +192,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
|||
}
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser,
|
||||
.rowSize = sizeof(SUserObj)
|
||||
|
@ -209,7 +209,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
|||
|
||||
static int32_t mgmtDropUser(SUserObj *pUser) {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser
|
||||
};
|
||||
|
@ -483,7 +483,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
|
|||
|
||||
if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_LOCAL,
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser,
|
||||
};
|
||||
|
|
|
@ -166,7 +166,7 @@ int32_t mgmtInitVgroups() {
|
|||
.hashSessions = TSDB_MAX_VGROUPS,
|
||||
.maxRowSize = tsVgUpdateSize,
|
||||
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
|
||||
.keyType = SDB_KEY_TYPE_AUTO,
|
||||
.keyType = SDB_KEY_AUTO,
|
||||
.insertFp = mgmtVgroupActionInsert,
|
||||
.deleteFp = mgmtVgroupActionDelete,
|
||||
.updateFp = mgmtVgroupActionUpdate,
|
||||
|
@ -203,7 +203,7 @@ SVgObj *mgmtGetVgroup(int32_t vgId) {
|
|||
|
||||
void mgmtUpdateVgroup(SVgObj *pVgroup) {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup,
|
||||
.rowSize = tsVgUpdateSize
|
||||
|
@ -247,7 +247,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
|
|||
}
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup,
|
||||
.rowSize = sizeof(SVgObj)
|
||||
|
@ -279,7 +279,7 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
|
|||
mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
|
||||
mgmtSendDropVgroupMsg(pVgroup, NULL);
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup
|
||||
};
|
||||
|
@ -586,7 +586,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
mgmtAddToShellQueue(newMsg);
|
||||
} else {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup
|
||||
};
|
||||
|
@ -649,7 +649,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
if (queueMsg->received != queueMsg->expected) return;
|
||||
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup
|
||||
};
|
||||
|
@ -706,7 +706,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
|
|||
|
||||
if (strncmp(pDropDb->name, pVgroup->dbName, dbNameLen) == 0) {
|
||||
SSdbOperDesc oper = {
|
||||
.type = SDB_OPER_TYPE_LOCAL,
|
||||
.type = SDB_OPER_LOCAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup,
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue