fix:error in multi thread in schemaless
This commit is contained in:
parent
40b841f704
commit
9bccbbf865
|
@ -473,6 +473,21 @@ static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SH
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t smlCheckMeta(SSchema* schema, int32_t length, SArray* cols){
|
||||
SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
for(uint16_t i = 0; i < length; i++){
|
||||
taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(cols); i++){
|
||||
SSmlKv* kv = (SSmlKv*)taosArrayGetP(cols, i);
|
||||
if(taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL){
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
||||
int32_t code = 0;
|
||||
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
|
||||
|
@ -483,6 +498,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
|||
while (tableMetaSml) {
|
||||
SSmlSTableMeta* sTableData = *tableMetaSml;
|
||||
STableMeta *pTableMeta = NULL;
|
||||
bool needCheckMeta = false; // for multi thread
|
||||
|
||||
size_t superTableLen = 0;
|
||||
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
|
||||
|
@ -533,6 +549,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
needCheckMeta = true;
|
||||
} else {
|
||||
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
|
||||
goto end;
|
||||
|
@ -544,6 +561,20 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
|
|||
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if(needCheckMeta){
|
||||
code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, sTableData->tags);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%"PRIx64" check tag failed. super table name %s", info->id, (char*)superTable);
|
||||
goto end;
|
||||
}
|
||||
code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%"PRIx64" check cols failed. super table name %s", info->id, (char*)superTable);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
sTableData->tableMeta = pTableMeta;
|
||||
|
||||
tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml);
|
||||
|
@ -2368,6 +2399,7 @@ static void smlInsertCallback(void* param, void* res, int32_t code) {
|
|||
SRequestObj *pRequest = (SRequestObj *)res;
|
||||
SSmlHandle* info = (SSmlHandle *)param;
|
||||
|
||||
uDebug("SML:0x%"PRIx64" result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
|
||||
// lock
|
||||
if(code != TSDB_CODE_SUCCESS){
|
||||
taosThreadSpinLock(&info->params->lock);
|
||||
|
@ -2497,7 +2529,7 @@ end:
|
|||
taosThreadSpinDestroy(¶ms.lock);
|
||||
tsem_destroy(¶ms.sem);
|
||||
((STscObj *)taos)->schemalessType = 0;
|
||||
uDebug("result:%s", request->msgBuf);
|
||||
uDebug("resultend:%s", request->msgBuf);
|
||||
return (TAOS_RES*)request;
|
||||
}
|
||||
|
||||
|
|
|
@ -2119,9 +2119,11 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
|
|||
isOrdered = false;
|
||||
}
|
||||
if (index < 0) {
|
||||
uError("smlBoundColumnData. index:%d", index);
|
||||
return TSDB_CODE_SML_INVALID_DATA;
|
||||
}
|
||||
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
|
||||
uError("smlBoundColumnData. already set. index:%d", index);
|
||||
return TSDB_CODE_SML_INVALID_DATA;
|
||||
}
|
||||
lastColIdx = index;
|
||||
|
|
|
@ -4945,6 +4945,9 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p
|
|||
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot;
|
||||
int32_t code = checkSchemalessDb(pCxt, pStmt->dbName);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
STableMeta* pTableMeta = NULL;
|
||||
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
|
Loading…
Reference in New Issue