commit
d26d41e719
|
@ -63,7 +63,6 @@ typedef struct _SSdbTable {
|
|||
int32_t (*encodeFp)(SSdbOper *pOper);
|
||||
int32_t (*destroyFp)(SSdbOper *pOper);
|
||||
int32_t (*restoredFp)();
|
||||
pthread_mutex_t mutex;
|
||||
} SSdbTable;
|
||||
|
||||
typedef struct {
|
||||
|
@ -429,24 +428,18 @@ static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) {
|
|||
|
||||
void *sdbGetRow(void *handle, void *key) {
|
||||
SSdbTable *pTable = (SSdbTable *)handle;
|
||||
SSdbRow * pMeta;
|
||||
|
||||
if (handle == NULL) return NULL;
|
||||
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
keySize = strlen((char *)key);
|
||||
}
|
||||
pMeta = taosHashGet(pTable->iHandle, key, keySize);
|
||||
|
||||
if (pMeta) sdbIncRef(pTable, pMeta->row);
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
if (pMeta == NULL) return NULL;
|
||||
|
||||
return pMeta->row;
|
||||
SSdbRow *pMeta = taosHashGet(pTable->iHandle, key, keySize);
|
||||
if (pMeta) {
|
||||
sdbIncRef(pTable, pMeta->row);
|
||||
return pMeta->row;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) {
|
||||
|
@ -458,8 +451,6 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
rowMeta.rowSize = pOper->rowSize;
|
||||
rowMeta.row = pOper->pObj;
|
||||
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
|
||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
|
||||
|
@ -470,16 +461,14 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow));
|
||||
|
||||
sdbIncRef(pTable, pOper->pObj);
|
||||
pTable->numOfRows++;
|
||||
atomic_add_fetch_32(&pTable->numOfRows, 1);
|
||||
|
||||
if (pTable->keyType == SDB_KEY_AUTO) {
|
||||
pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pOper->pObj));
|
||||
} else {
|
||||
pTable->autoIndex++;
|
||||
atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
sdbTrace("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
|
||||
sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, sdbGetVersion());
|
||||
|
||||
|
@ -490,19 +479,14 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
|
||||
(*pTable->deleteFp)(pOper);
|
||||
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
|
||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
||||
int32_t keySize = sizeof(int32_t);
|
||||
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
keySize = strlen((char *)key);
|
||||
}
|
||||
|
||||
taosHashRemove(pTable->iHandle, key, keySize);
|
||||
|
||||
pTable->numOfRows--;
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
atomic_sub_fetch_32(&pTable->numOfRows, 1);
|
||||
|
||||
sdbTrace("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
|
||||
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
|
||||
|
@ -608,14 +592,12 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
if (pTable->keyType == SDB_KEY_AUTO) {
|
||||
pthread_mutex_lock(&pTable->mutex);
|
||||
*((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
|
||||
*((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
|
||||
// let vgId increase from 2
|
||||
if (pTable->autoIndex == 1 && strcmp(pTable->tableName, "vgroups") == 0) {
|
||||
*((uint32_t *)pOper->pObj) = ++pTable->autoIndex;
|
||||
*((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1);
|
||||
}
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
}
|
||||
|
||||
int32_t code = sdbInsertHash(pTable, pOper);
|
||||
|
@ -805,8 +787,6 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
|
|||
}
|
||||
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true);
|
||||
|
||||
pthread_mutex_init(&pTable->mutex, NULL);
|
||||
|
||||
tsSdbObj.numOfTables++;
|
||||
tsSdbObj.tableList[pTable->tableId] = pTable;
|
||||
return pTable;
|
||||
|
@ -835,8 +815,6 @@ void sdbCloseTable(void *handle) {
|
|||
taosHashDestroyIter(pIter);
|
||||
taosHashCleanup(pTable->iHandle);
|
||||
|
||||
pthread_mutex_destroy(&pTable->mutex);
|
||||
|
||||
sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables);
|
||||
free(pTable);
|
||||
}
|
||||
|
|
|
@ -780,6 +780,16 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
|
||||
if (pTable != NULL) {
|
||||
mLPrint("app:%p:%p, stable:%s, create result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
||||
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
|
||||
|
||||
|
@ -819,25 +829,27 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
assert(tschema[col].type >= TSDB_DATA_TYPE_BOOL && tschema[col].type <= TSDB_DATA_TYPE_NCHAR);
|
||||
}
|
||||
|
||||
pMsg->pTable = (STableObj *)pStable;
|
||||
mnodeIncTableRef(pMsg->pTable);
|
||||
|
||||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
.rowSize = sizeof(SSuperTableObj) + schemaSize,
|
||||
.pMsg = pMsg
|
||||
.pMsg = pMsg,
|
||||
.cb = mnodeCreateSuperTableCb
|
||||
};
|
||||
|
||||
int32_t code = sdbInsertRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mnodeDestroySuperTable(pStable);
|
||||
pMsg->pTable = NULL;
|
||||
mError("app:%p:%p, table:%s, failed to create, sdb error", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return code;
|
||||
} else {
|
||||
mLPrint("app:%p:%p, table:%s, is created, tags:%d fields:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
pStable->numOfTags, pStable->numOfColumns);
|
||||
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
||||
|
@ -1535,10 +1547,16 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
|
|||
}
|
||||
|
||||
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
||||
if (pTable != NULL) {
|
||||
mTrace("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
|
||||
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable);
|
||||
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable);
|
||||
if (pMDCreate == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
@ -1639,16 +1657,13 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
|
|||
|
||||
int32_t code = sdbInsertRow(&desc);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
free(pTable);
|
||||
mnodeDestroyChildTable(pTable);
|
||||
pMsg->pTable = NULL;
|
||||
mError("app:%p:%p, table:%s, update sdb error, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
|
||||
tstrerror(code));
|
||||
pMsg->pTable = NULL;
|
||||
return code;
|
||||
} else {
|
||||
mTrace("app:%p:%p, table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
|
||||
|
|
|
@ -95,7 +95,7 @@ void createDbAndTable() {
|
|||
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
@ -114,7 +114,7 @@ void createDbAndTable() {
|
|||
pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
|
||||
for (int64_t t = 0; t < totalTables; ++t) {
|
||||
sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t);
|
||||
|
@ -124,7 +124,7 @@ void createDbAndTable() {
|
|||
pError("failed to create table %s%" PRId64 ", reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
} else {
|
||||
for (int64_t t = 0; t < totalTables; ++t) {
|
||||
|
@ -140,7 +140,7 @@ void createDbAndTable() {
|
|||
pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,6 +148,7 @@ void createDbAndTable() {
|
|||
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
float seconds = (et - st) / 1000.0 / 1000.0;
|
||||
pPrint("%.1f seconds to create %ld tables, speed:%.1f", seconds, totalTables, totalTables / seconds);
|
||||
taos_close(con);
|
||||
}
|
||||
|
||||
void insertData() {
|
||||
|
@ -257,7 +258,7 @@ void *syncTest(void *param) {
|
|||
pError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName,
|
||||
table, row, taos_errstr(con));
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
taos_free_result(pSql);
|
||||
|
||||
// "insert into"
|
||||
len = sprintf(sql, "%s", inserStr);
|
||||
|
|
Loading…
Reference in New Issue