diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 193ec1924b..c34317437c 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -63,7 +63,6 @@ typedef struct _SSdbTable { int32_t (*encodeFp)(SSdbOper *pOper); int32_t (*destroyFp)(SSdbOper *pOper); int32_t (*restoredFp)(); - pthread_mutex_t mutex; } SSdbTable; typedef struct { @@ -429,24 +428,18 @@ static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) { void *sdbGetRow(void *handle, void *key) { SSdbTable *pTable = (SSdbTable *)handle; - SSdbRow * pMeta; - - if (handle == NULL) return NULL; - - pthread_mutex_lock(&pTable->mutex); - int32_t keySize = sizeof(int32_t); if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { keySize = strlen((char *)key); } - pMeta = taosHashGet(pTable->iHandle, key, keySize); - - if (pMeta) sdbIncRef(pTable, pMeta->row); - pthread_mutex_unlock(&pTable->mutex); - - if (pMeta == NULL) return NULL; - - return pMeta->row; + + SSdbRow *pMeta = taosHashGet(pTable->iHandle, key, keySize); + if (pMeta) { + sdbIncRef(pTable, pMeta->row); + return pMeta->row; + } else { + return NULL; + } } static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) { @@ -458,8 +451,6 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { rowMeta.rowSize = pOper->rowSize; rowMeta.row = pOper->pObj; - pthread_mutex_lock(&pTable->mutex); - void * key = sdbGetObjKey(pTable, pOper->pObj); int32_t keySize = sizeof(int32_t); @@ -470,16 +461,14 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow)); sdbIncRef(pTable, pOper->pObj); - pTable->numOfRows++; + atomic_add_fetch_32(&pTable->numOfRows, 1); if (pTable->keyType == SDB_KEY_AUTO) { pTable->autoIndex = MAX(pTable->autoIndex, *((uint32_t *)pOper->pObj)); } else { - pTable->autoIndex++; + atomic_add_fetch_32(&pTable->autoIndex, 1); } - pthread_mutex_unlock(&pTable->mutex); - sdbTrace("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, sdbGetVersion()); @@ -490,20 +479,15 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { (*pTable->deleteFp)(pOper); - pthread_mutex_lock(&pTable->mutex); - void * key = sdbGetObjKey(pTable, pOper->pObj); int32_t keySize = sizeof(int32_t); - if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { keySize = strlen((char *)key); } taosHashRemove(pTable->iHandle, key, keySize); - - pTable->numOfRows--; - pthread_mutex_unlock(&pTable->mutex); - + atomic_sub_fetch_32(&pTable->numOfRows, 1); + sdbTrace("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion()); @@ -608,14 +592,12 @@ int32_t sdbInsertRow(SSdbOper *pOper) { } if (pTable->keyType == SDB_KEY_AUTO) { - pthread_mutex_lock(&pTable->mutex); - *((uint32_t *)pOper->pObj) = ++pTable->autoIndex; + *((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1); // let vgId increase from 2 if (pTable->autoIndex == 1 && strcmp(pTable->tableName, "vgroups") == 0) { - *((uint32_t *)pOper->pObj) = ++pTable->autoIndex; + *((uint32_t *)pOper->pObj) = atomic_add_fetch_32(&pTable->autoIndex, 1); } - pthread_mutex_unlock(&pTable->mutex); } int32_t code = sdbInsertHash(pTable, pOper); @@ -805,8 +787,6 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { } pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true); - pthread_mutex_init(&pTable->mutex, NULL); - tsSdbObj.numOfTables++; tsSdbObj.tableList[pTable->tableId] = pTable; return pTable; @@ -835,8 +815,6 @@ void sdbCloseTable(void *handle) { taosHashDestroyIter(pIter); taosHashCleanup(pTable->iHandle); - pthread_mutex_destroy(&pTable->mutex); - sdbTrace("table:%s, is closed, numOfTables:%d", pTable->tableName, tsSdbObj.numOfTables); free(pTable); } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 565ce9910e..346c33f80d 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -780,6 +780,16 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { } } +static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { + SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; + if (pTable != NULL) { + mLPrint("app:%p:%p, stable:%s, create result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, + tstrerror(code)); + } + + return code; +} + static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; @@ -819,25 +829,27 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { assert(tschema[col].type >= TSDB_DATA_TYPE_BOOL && tschema[col].type <= TSDB_DATA_TYPE_NCHAR); } + pMsg->pTable = (STableObj *)pStable; + mnodeIncTableRef(pMsg->pTable); + SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, .pObj = pStable, .rowSize = sizeof(SSuperTableObj) + schemaSize, - .pMsg = pMsg + .pMsg = pMsg, + .cb = mnodeCreateSuperTableCb }; int32_t code = sdbInsertRow(&oper); if (code != TSDB_CODE_SUCCESS) { mnodeDestroySuperTable(pStable); + pMsg->pTable = NULL; mError("app:%p:%p, table:%s, failed to create, sdb error", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); + return code; } else { - mLPrint("app:%p:%p, table:%s, is created, tags:%d fields:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, - pStable->numOfTags, pStable->numOfColumns); - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - return code; } static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { @@ -1535,10 +1547,16 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO } static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { + SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + if (pTable != NULL) { + mTrace("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); + } + if (code != TSDB_CODE_SUCCESS) return code; - + SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; - SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable); + SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable); if (pMDCreate == NULL) { return terrno; } @@ -1639,16 +1657,13 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { int32_t code = sdbInsertRow(&desc); if (code != TSDB_CODE_SUCCESS) { - free(pTable); + mnodeDestroyChildTable(pTable); + pMsg->pTable = NULL; mError("app:%p:%p, table:%s, update sdb error, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId, tstrerror(code)); - pMsg->pTable = NULL; - return code; - } else { - mTrace("app:%p:%p, table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, - pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid); - return TSDB_CODE_SUCCESS; - } + } + + return code; } static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { diff --git a/tests/test/c/insertPerTable.c b/tests/test/c/insertPerTable.c index a5e2c4b966..d79df0f4d1 100644 --- a/tests/test/c/insertPerTable.c +++ b/tests/test/c/insertPerTable.c @@ -95,7 +95,7 @@ void createDbAndTable() { pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); exit(0); } - taos_stop_query(pSql); + taos_free_result(pSql); gettimeofday(&systemTime, NULL); st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; @@ -114,7 +114,7 @@ void createDbAndTable() { pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con)); exit(0); } - taos_stop_query(pSql); + taos_free_result(pSql); for (int64_t t = 0; t < totalTables; ++t) { sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t); @@ -124,7 +124,7 @@ void createDbAndTable() { pError("failed to create table %s%" PRId64 ", reason:%s", stableName, t, taos_errstr(con)); exit(0); } - taos_stop_query(pSql); + taos_free_result(pSql); } } else { for (int64_t t = 0; t < totalTables; ++t) { @@ -140,7 +140,7 @@ void createDbAndTable() { pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con)); exit(0); } - taos_stop_query(pSql); + taos_free_result(pSql); } } @@ -148,6 +148,7 @@ void createDbAndTable() { et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; float seconds = (et - st) / 1000.0 / 1000.0; pPrint("%.1f seconds to create %ld tables, speed:%.1f", seconds, totalTables, totalTables / seconds); + taos_close(con); } void insertData() { @@ -257,7 +258,7 @@ void *syncTest(void *param) { pError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName, table, row, taos_errstr(con)); } - taos_stop_query(pSql); + taos_free_result(pSql); // "insert into" len = sprintf(sql, "%s", inserStr);