Merge pull request #16386 from taosdata/enh/updateTbMeta

enh: update table meta cache after creating table
This commit is contained in:
Shengliang Guan 2022-08-25 14:32:43 +08:00 committed by GitHub
commit 4c280dc236
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 547 additions and 63 deletions

View File

@ -442,6 +442,26 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
STSchema* tdGetSTSChemaFromSSChema(SSchema* pSchema, int32_t nCols, int32_t sver);
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
int8_t tableType;
int32_t sversion;
int32_t tversion;
uint64_t suid;
uint64_t tuid;
int32_t vgId;
int8_t sysInfo;
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
@ -473,6 +493,14 @@ int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq
int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
void tFreeSMCreateStbReq(SMCreateStbReq* pReq);
typedef struct {
STableMetaRsp* pMeta;
} SMCreateStbRsp;
int32_t tEncodeSMCreateStbRsp(SEncoder* pEncoder, const SMCreateStbRsp* pRsp);
int32_t tDecodeSMCreateStbRsp(SDecoder* pDecoder, SMCreateStbRsp* pRsp);
void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
@ -1241,24 +1269,6 @@ typedef struct {
SVgroupInfo vgroups[];
} SVgroupsInfo;
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
int8_t tableType;
int32_t sversion;
int32_t tversion;
uint64_t suid;
uint64_t tuid;
int32_t vgId;
int8_t sysInfo;
SSchema* pSchemas;
} STableMetaRsp;
typedef struct {
STableMetaRsp* pMeta;
} SMAlterStbRsp;
@ -1269,7 +1279,7 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
void tFreeSTableMetaRsp(STableMetaRsp* pRsp);
void tFreeSTableMetaRsp(void* pRsp);
void tFreeSTableIndexRsp(void* info);
typedef struct {
@ -2031,11 +2041,13 @@ int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
typedef struct {
int32_t code;
int32_t code;
STableMetaRsp* pMeta;
} SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
void tFreeSVCreateTbRsp(void* param);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);

View File

@ -215,6 +215,7 @@ void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp *msg, SCTableMeta *pMeta);
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
char* jobTaskStatusStr(int32_t status);

View File

@ -210,6 +210,8 @@ void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp);
*/
void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp);
int64_t taosHashGetCompTimes(SHashObj *pHashObj);
#ifdef __cplusplus
}
#endif

View File

@ -369,8 +369,9 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData*
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clientImpl.c and become a static function
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
bool qnodeRequired(SRequestObj* pRequest);
#ifdef __cplusplus

View File

@ -782,6 +782,10 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
if (NULL == pRequest->body.resInfo.execRes.res) {
return TSDB_CODE_SUCCESS;
@ -804,6 +808,19 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
code = handleAlterTbExecRes(pRes->res, pCatalog);
break;
}
case TDMT_VND_CREATE_TABLE: {
SArray* pList = (SArray*)pRes->res;
int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
void* res = taosArrayGetP(pList, i);
code = handleCreateTbExecRes(res, pCatalog);
}
break;
}
case TDMT_MND_CREATE_STB: {
code = handleCreateTbExecRes(pRes->res, pCatalog);
break;
}
case TDMT_VND_SUBMIT: {
atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
@ -863,17 +880,13 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
return;
}
if (code == TSDB_CODE_SUCCESS) {
code = handleQueryExecRsp(pRequest);
ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
pRequest->code = code;
}
tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
removeMeta(pTscObj, pRequest->targetTableList);
}
handleQueryExecRsp(pRequest);
// return to client
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
@ -934,6 +947,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
qDestroyQuery(pQuery);
}
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList);
}
handleQueryExecRsp(pRequest);
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
@ -1132,10 +1149,6 @@ SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool valida
inRetry = true;
} while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList);
}
return pRequest;
}

View File

@ -233,13 +233,36 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param;
taosMemoryFree(pMsg->pData);
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
} else {
SMCreateStbRsp createRsp = {0};
SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
tDecodeSMCreateStbRsp(&coder, &createRsp);
tDecoderClear(&coder);
pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
pRequest->body.resInfo.execRes.res = createRsp.pMeta;
}
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) {
removeMeta(pRequest->pTscObj, pRequest->tableList);
SExecResult* pRes = &pRequest->body.resInfo.execRes;
if (code == TSDB_CODE_SUCCESS) {
SCatalog* pCatalog = NULL;
int32_t ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (pRes->res != NULL) {
ret = handleCreateTbExecRes(pRes->res, pCatalog);
}
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
}
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} else {
tsem_post(&pRequest->body.rspSem);

View File

@ -3196,12 +3196,16 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) {
if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1;
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
if (pRsp->pSchemas == NULL) return -1;
if (totalCols > 0) {
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i];
if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pRsp->pSchemas[i];
if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1;
}
} else {
pRsp->pSchemas = NULL;
}
return 0;
@ -3326,7 +3330,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
return 0;
}
void tFreeSTableMetaRsp(STableMetaRsp *pRsp) { taosMemoryFreeClear(pRsp->pSchemas); }
void tFreeSTableMetaRsp(void *pRsp) { taosMemoryFreeClear(((STableMetaRsp*)pRsp)->pSchemas); }
void tFreeSTableIndexRsp(void *info) {
if (NULL == info) {
@ -5092,6 +5096,10 @@ int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pRsp->code) < 0) return -1;
if (tEncodeI32(pCoder, pRsp->pMeta ? 1 : 0) < 0) return -1;
if (pRsp->pMeta) {
if (tEncodeSTableMetaRsp(pCoder, pRsp->pMeta) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
@ -5102,10 +5110,32 @@ int tDecodeSVCreateTbRsp(SDecoder *pCoder, SVCreateTbRsp *pRsp) {
if (tDecodeI32(pCoder, &pRsp->code) < 0) return -1;
int32_t meta = 0;
if (tDecodeI32(pCoder, &meta) < 0) return -1;
if (meta) {
pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pRsp->pMeta) return -1;
if (tDecodeSTableMetaRsp(pCoder, pRsp->pMeta) < 0) return -1;
} else {
pRsp->pMeta = NULL;
}
tEndDecode(pCoder);
return 0;
}
void tFreeSVCreateTbRsp(void* param) {
if (NULL == param) {
return;
}
SVCreateTbRsp* pRsp = (SVCreateTbRsp*)param;
if (pRsp->pMeta) {
taosMemoryFree(pRsp->pMeta->pSchemas);
taosMemoryFree(pRsp->pMeta);
}
}
// TDMT_VND_DROP_TABLE =================
static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
@ -5560,6 +5590,60 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
}
}
int32_t tEncodeSMCreateStbRsp(SEncoder *pEncoder, const SMCreateStbRsp *pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->pMeta->pSchemas ? 1 : 0) < 0) return -1;
if (pRsp->pMeta->pSchemas) {
if (tEncodeSTableMetaRsp(pEncoder, pRsp->pMeta) < 0) return -1;
}
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeSMCreateStbRsp(SDecoder *pDecoder, SMCreateStbRsp *pRsp) {
int32_t meta = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &meta) < 0) return -1;
if (meta) {
pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pRsp->pMeta) return -1;
if (tDecodeSTableMetaRsp(pDecoder, pRsp->pMeta) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
int32_t tDeserializeSMCreateStbRsp(void *buf, int32_t bufLen, SMCreateStbRsp *pRsp) {
int32_t meta = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &meta) < 0) return -1;
if (meta) {
pRsp->pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == pRsp->pMeta) return -1;
if (tDecodeSTableMetaRsp(&decoder, pRsp->pMeta) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) {
if (NULL == pRsp) {
return;
}
if (pRsp->pMeta) {
taosMemoryFree(pRsp->pMeta->pSchemas);
taosMemoryFree(pRsp->pMeta);
}
}
int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) {
if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1;
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {

View File

@ -35,6 +35,7 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
void mndFreeStb(SStbObj *pStb);
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen);
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);

View File

@ -1774,6 +1774,67 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
return 0;
}
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen) {
int32_t ret = -1;
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
if (NULL == pDb) {
return -1;
}
SStbObj *pObj = mndAcquireStb(pMnode, stbFName);
if (NULL == pObj) {
goto _OVER;
}
SEncoder ec = {0};
uint32_t contLen = 0;
SMCreateStbRsp stbRsp = {0};
SName name = {0};
tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (NULL == stbRsp.pMeta) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, stbRsp.pMeta);
if (ret) {
tFreeSMCreateStbRsp(&stbRsp);
goto _OVER;
}
tEncodeSize(tEncodeSMCreateStbRsp, &stbRsp, contLen, ret);
if (ret) {
tFreeSMCreateStbRsp(&stbRsp);
goto _OVER;
}
void *cont = taosMemoryMalloc(contLen);
tEncoderInit(&ec, cont, contLen);
tEncodeSMCreateStbRsp(&ec, &stbRsp);
tEncoderClear(&ec);
tFreeSMCreateStbRsp(&stbRsp);
*pCont = cont;
*pLen = contLen;
ret = 0;
_OVER:
if (pObj) {
mndReleaseStb(pMnode, pObj);
}
if (pDb) {
mndReleaseDb(pMnode, pDb);
}
return ret;
}
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
void *alterOriData, int32_t alterOriDataLen) {
int32_t code = -1;

View File

@ -17,6 +17,7 @@
#include "mndTrans.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndStb.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndSync.h"
@ -900,15 +901,6 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
SRpcMsg rspMsg = {.code = code, .info = *pInfo};
if (pTrans->rpcRspLen != 0) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
rspMsg.pCont = rpcCont;
rspMsg.contLen = pTrans->rpcRspLen;
}
}
if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
mDebug("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1);
@ -924,6 +916,21 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
}
mndReleaseDb(pMnode, pDb);
} else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) {
void *pCont = NULL;
int32_t contLen = 0;
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname1, pTrans->dbname2, &pCont, &contLen) != 0) {
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
}
if (pTrans->rpcRspLen != 0) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
rspMsg.pCont = rpcCont;
rspMsg.contLen = pTrans->rpcRspLen;
}
}
tmsgSendRsp(&rspMsg);

View File

@ -102,7 +102,7 @@ int metaCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp **pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);

View File

@ -367,7 +367,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
return 0;
}
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
SMetaEntry me = {0};
SMetaReader mr = {0};
@ -427,6 +427,21 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
if (pMetaRsp) {
*pMetaRsp = taosMemoryCalloc(1, sizeof(STableMetaRsp));
if (*pMetaRsp) {
if (me.type == TSDB_CHILD_TABLE) {
(*pMetaRsp)->tableType = TSDB_CHILD_TABLE;
(*pMetaRsp)->tuid = pReq->uid;
(*pMetaRsp)->suid = pReq->ctb.suid;
strcpy((*pMetaRsp)->tbName, pReq->name);
} else {
metaUpdateMetaRsp(pReq->uid, pReq->name, &pReq->ntb.schemaRow, *pMetaRsp);
}
}
}
metaDebug("vgId:%d, table:%s uid %" PRId64 " is created, type:%" PRId8, TD_VID(pMeta->pVnode), pReq->name, pReq->uid,
pReq->type);
return 0;

View File

@ -368,6 +368,10 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
}
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
if (NULL == pMetaRsp) {
return;
}
strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
pMetaRsp->dbId = pVnode->config.dbId;
pMetaRsp->vgId = TD_VID(pVnode);
@ -512,7 +516,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
}
// do create table
if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
if (metaCreateTable(pVnode->pMeta, version, pCreateReq, &cRsp.pMeta) < 0) {
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
cRsp.code = TSDB_CODE_SUCCESS;
} else {
@ -522,6 +526,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
cRsp.code = TSDB_CODE_SUCCESS;
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
taosArrayPush(tbUids, &pCreateReq->uid);
vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
}
taosArrayPush(rsp.pArray, &cRsp);
@ -550,7 +555,7 @@ _exit:
pCreateReq = req.pReqs + iReq;
taosArrayDestroy(pCreateReq->ctb.tagName);
}
taosArrayDestroy(rsp.pArray);
taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncoderClear(&encoder);
@ -862,7 +867,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit;
}
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (metaCreateTable(pVnode->pMeta, version, &createTbReq, NULL) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
submitBlkRsp.code = terrno;
pRsp->code = terrno;

View File

@ -270,13 +270,22 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
int32_t code = 0;
strcpy(output->dbFName, rspMsg->dbFName);
strcpy(output->tbName, rspMsg->tbName);
output->dbId = rspMsg->dbId;
SET_META_TYPE_TABLE(output->metaType);
if (TSDB_CHILD_TABLE == rspMsg->tableType && NULL == rspMsg->pSchemas) {
strcpy(output->ctbName, rspMsg->tbName);
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta));
SET_META_TYPE_CTABLE(output->metaType);
CTG_ERR_JRET(queryCreateCTableMetaFromMsg(rspMsg, &output->ctbMeta));
} else {
strcpy(output->tbName, rspMsg->tbName);
SET_META_TYPE_TABLE(output->metaType);
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta));
}
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncOp));

View File

@ -135,7 +135,12 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*len += snprintf(buf + *len, bufSize - *len, "%s", t);
int32_t tlen = strlen(t);
if (tlen > 32) {
*len += snprintf(buf + *len, bufSize - *len, "%.*s...%s", 32, t, t + tlen - 1);
} else {
*len += snprintf(buf + *len, bufSize - *len, "%s", t);
}
taosMemoryFree(t);
return TSDB_CODE_SUCCESS;
@ -199,12 +204,17 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
SNodeListNode *pListNode = (SNodeListNode *)pNode;
SNode *node = NULL;
bool first = true;
int32_t num = 0;
*len += snprintf(buf + *len, bufSize - *len, "(");
FOREACH(node, pListNode->pNodeList) {
if (!first) {
*len += snprintf(buf + *len, bufSize - *len, ", ");
if (++num >= 10) {
*len += snprintf(buf + *len, bufSize - *len, "...");
break;
}
}
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;

View File

@ -213,15 +213,25 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
return s;
}
void freeSTableMetaRspPointer(void *p) {
tFreeSTableMetaRsp(*(void**)p);
taosMemoryFreeClear(*(void**)p);
}
void destroyQueryExecRes(SExecResult* pRes) {
if (NULL == pRes || NULL == pRes->res) {
return;
}
switch (pRes->msgType) {
case TDMT_VND_CREATE_TABLE: {
taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
break;
}
case TDMT_MND_CREATE_STB:
case TDMT_VND_ALTER_TABLE:
case TDMT_MND_ALTER_STB: {
tFreeSTableMetaRsp((STableMetaRsp*)pRes->res);
tFreeSTableMetaRsp(pRes->res);
taosMemoryFreeClear(pRes->res);
break;
}

View File

@ -354,6 +354,19 @@ static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp *msg, SCTableMeta *pMeta) {
pMeta->vgId = msg->vgId;
pMeta->tableType = msg->tableType;
pMeta->uid = msg->tuid;
pMeta->suid = msg->suid;
qDebug("ctable %s uid %" PRIx64 " meta returned, type %d vgId:%d db %s suid %" PRIx64 ,
msg->tbName, pMeta->uid, pMeta->tableType, pMeta->vgId, msg->dbFName, pMeta->suid);
return TSDB_CODE_SUCCESS;
}
int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta **pMeta) {
int32_t total = msg->numOfColumns + msg->numOfTags;
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;

View File

@ -102,15 +102,30 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
tDecoderInit(&coder, msg, msgSize);
code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (NULL == pJob->execRes.res) {
pJob->execRes.res = taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
}
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVCreateTbRsp *rsp = batchRsp.pRsps + i;
if (rsp->pMeta) {
taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta);
}
if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code;
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
if (taosArrayGetSize((SArray*)pJob->execRes.res) <= 0) {
taosArrayDestroy((SArray*)pJob->execRes.res);
pJob->execRes.res = NULL;
}
}
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}

View File

@ -21,7 +21,7 @@
// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT
#define MAX_WARNING_REF_COUNT 10000
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_MAX_CAPACITY (1024 * 1024 * 1024)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
@ -67,6 +67,7 @@ struct SHashObj {
bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry
_hash_before_fn_t callbackFp; // function invoked before return the value to caller
int64_t compTimes;
};
/*
@ -146,6 +147,7 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr
uint32_t hashVal) {
SHashNode *pNode = pe->next;
while (pNode) {
atomic_add_fetch_64(&pHashObj->compTimes, 1);
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
pNode->removed == 0) {
assert(pNode->hashVal == hashVal);
@ -882,3 +884,7 @@ void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) {
}
void taosHashRelease(SHashObj *pHashObj, void *p) { taosHashCancelIterate(pHashObj, p); }
int64_t taosHashGetCompTimes(SHashObj *pHashObj) { return atomic_load_64(&pHashObj->compTimes); }

View File

@ -197,6 +197,201 @@ void acquireRleaseTest() {
taosMemoryFreeClear(data.p);
}
void perfTest() {
SHashObj* hash1h = (SHashObj*) taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash1s = (SHashObj*) taosHashInit(1000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash10s = (SHashObj*) taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash100s = (SHashObj*) taosHashInit(100000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash1m = (SHashObj*) taosHashInit(1000000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash10m = (SHashObj*) taosHashInit(10000000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* hash100m = (SHashObj*) taosHashInit(100000000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
char *name = (char*)taosMemoryCalloc(50000000, 9);
for (int64_t i = 0; i < 50000000; ++i) {
sprintf(name + i * 9, "t%08d", i);
}
for (int64_t i = 0; i < 50; ++i) {
taosHashPut(hash1h, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 500; ++i) {
taosHashPut(hash1s, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 5000; ++i) {
taosHashPut(hash10s, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 50000; ++i) {
taosHashPut(hash100s, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 500000; ++i) {
taosHashPut(hash1m, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 5000000; ++i) {
taosHashPut(hash10m, name + i * 9, 9, &i, sizeof(i));
}
for (int64_t i = 0; i < 50000000; ++i) {
taosHashPut(hash100m, name + i * 9, 9, &i, sizeof(i));
}
int64_t start1h = taosGetTimestampMs();
int64_t start1hCt = taosHashGetCompTimes(hash1h);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash1h, name + (i % 50) * 9, 9));
}
int64_t end1h = taosGetTimestampMs();
int64_t end1hCt = taosHashGetCompTimes(hash1h);
int64_t start1s = taosGetTimestampMs();
int64_t start1sCt = taosHashGetCompTimes(hash1s);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash1s, name + (i % 500) * 9, 9));
}
int64_t end1s = taosGetTimestampMs();
int64_t end1sCt = taosHashGetCompTimes(hash1s);
int64_t start10s = taosGetTimestampMs();
int64_t start10sCt = taosHashGetCompTimes(hash10s);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash10s, name + (i % 5000) * 9, 9));
}
int64_t end10s = taosGetTimestampMs();
int64_t end10sCt = taosHashGetCompTimes(hash10s);
int64_t start100s = taosGetTimestampMs();
int64_t start100sCt = taosHashGetCompTimes(hash100s);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash100s, name + (i % 50000) * 9, 9));
}
int64_t end100s = taosGetTimestampMs();
int64_t end100sCt = taosHashGetCompTimes(hash100s);
int64_t start1m = taosGetTimestampMs();
int64_t start1mCt = taosHashGetCompTimes(hash1m);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash1m, name + (i % 500000) * 9, 9));
}
int64_t end1m = taosGetTimestampMs();
int64_t end1mCt = taosHashGetCompTimes(hash1m);
int64_t start10m = taosGetTimestampMs();
int64_t start10mCt = taosHashGetCompTimes(hash10m);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash10m, name + (i % 5000000) * 9, 9));
}
int64_t end10m = taosGetTimestampMs();
int64_t end10mCt = taosHashGetCompTimes(hash10m);
int64_t start100m = taosGetTimestampMs();
int64_t start100mCt = taosHashGetCompTimes(hash100m);
for (int64_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(hash100m, name + (i % 50000000) * 9, 9));
}
int64_t end100m = taosGetTimestampMs();
int64_t end100mCt = taosHashGetCompTimes(hash100m);
SArray *sArray[1000] = {0};
for (int64_t i = 0; i < 1000; ++i) {
sArray[i] = taosArrayInit(100000, 9);
}
int64_t cap = 4;
while (cap < 100000000) cap = (cap << 1u);
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
int32_t slotR = cap / 1000 + 1;
for (int64_t i = 0; i < 10000000; ++i) {
char* p = name + (i % 50000000) * 9;
uint32_t v = (*hashFp)(p, 9);
taosArrayPush(sArray[(v%cap)/slotR], p);
}
SArray *slArray = taosArrayInit(100000000, 9);
for (int64_t i = 0; i < 1000; ++i) {
int32_t num = taosArrayGetSize(sArray[i]);
SArray* pArray = sArray[i];
for (int64_t m = 0; m < num; ++m) {
char* p = (char*)taosArrayGet(pArray, m);
ASSERT(taosArrayPush(slArray, p));
}
}
int64_t start100mS = taosGetTimestampMs();
int64_t start100mSCt = taosHashGetCompTimes(hash100m);
int32_t num = taosArrayGetSize(slArray);
for (int64_t i = 0; i < num; ++i) {
ASSERT(taosHashGet(hash100m, (char*)TARRAY_GET_ELEM(slArray, i), 9));
}
int64_t end100mS = taosGetTimestampMs();
int64_t end100mSCt = taosHashGetCompTimes(hash100m);
for (int64_t i = 0; i < 1000; ++i) {
taosArrayDestroy(sArray[i]);
}
taosArrayDestroy(slArray);
printf("1h \t %" PRId64 "ms,%" PRId64 "\n", end1h - start1h, end1hCt - start1hCt);
printf("1s \t %" PRId64 "ms,%" PRId64 "\n", end1s - start1s, end1sCt - start1sCt);
printf("10s \t %" PRId64 "ms,%" PRId64 "\n", end10s - start10s, end10sCt - start10sCt);
printf("100s \t %" PRId64 "ms,%" PRId64 "\n", end100s - start100s, end100sCt - start100sCt);
printf("1m \t %" PRId64 "ms,%" PRId64 "\n", end1m - start1m, end1mCt - start1mCt);
printf("10m \t %" PRId64 "ms,%" PRId64 "\n", end10m - start10m, end10mCt - start10mCt);
printf("100m \t %" PRId64 "ms,%" PRId64 "\n", end100m - start100m, end100mCt - start100mCt);
printf("100mS \t %" PRId64 "ms,%" PRId64 "\n", end100mS - start100mS, end100mSCt - start100mSCt);
taosHashCleanup(hash1h);
taosHashCleanup(hash1s);
taosHashCleanup(hash10s);
taosHashCleanup(hash100s);
taosHashCleanup(hash1m);
taosHashCleanup(hash10m);
taosHashCleanup(hash100m);
SHashObj *mhash[1000] = {0};
for (int64_t i = 0; i < 1000; ++i) {
mhash[i] = (SHashObj*) taosHashInit(100000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
for (int64_t i = 0; i < 50000000; ++i) {
#if 0
taosHashPut(mhash[i%1000], name + i * 9, 9, &i, sizeof(i));
#else
taosHashPut(mhash[i/50000], name + i * 9, 9, &i, sizeof(i));
#endif
}
int64_t startMhashCt = 0;
for (int64_t i = 0; i < 1000; ++i) {
startMhashCt += taosHashGetCompTimes(mhash[i]);
}
int64_t startMhash = taosGetTimestampMs();
#if 0
for (int32_t i = 0; i < 10000000; ++i) {
ASSERT(taosHashGet(mhash[i%1000], name + i * 9, 9));
}
#else
// for (int64_t i = 0; i < 10000000; ++i) {
for (int64_t i = 0; i < 50000000; i+=5) {
ASSERT(taosHashGet(mhash[i/50000], name + i * 9, 9));
}
#endif
int64_t endMhash = taosGetTimestampMs();
int64_t endMhashCt = 0;
for (int64_t i = 0; i < 1000; ++i) {
printf(" %" PRId64 , taosHashGetCompTimes(mhash[i]));
endMhashCt += taosHashGetCompTimes(mhash[i]);
}
printf("\n100m \t %" PRId64 "ms,%" PRId64 "\n", endMhash - startMhash, endMhashCt - startMhashCt);
for (int64_t i = 0; i < 1000; ++i) {
taosHashCleanup(mhash[i]);
}
}
}
int main(int argc, char** argv) {
@ -210,4 +405,5 @@ TEST(testCase, hashTest) {
noLockPerformanceTest();
multithreadsTest();
acquireRleaseTest();
//perfTest();
}