fix: insert tsma data
This commit is contained in:
parent
20a0362d2b
commit
0beed37cfd
|
@ -89,7 +89,7 @@ bool tsSmlDataFormat =
|
||||||
|
|
||||||
// query
|
// query
|
||||||
int32_t tsQueryPolicy = 1;
|
int32_t tsQueryPolicy = 1;
|
||||||
int32_t tsQuerySmaOptimize = 0;
|
int32_t tsQuerySmaOptimize = 1;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
||||||
|
|
|
@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
ASSERT(smaObj.uid != 0);
|
ASSERT(smaObj.uid != 0);
|
||||||
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
||||||
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
|
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name);
|
||||||
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
|
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
smaObj.stbUid = pStb->uid;
|
smaObj.stbUid = pStb->uid;
|
||||||
|
@ -603,6 +603,9 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
|
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
|
mDebug("mndSma: create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d",
|
||||||
|
pCreate->name, smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
|
@ -381,6 +381,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
||||||
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return -1;
|
return -1;
|
||||||
|
} else if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
|
|
|
@ -116,8 +116,10 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// create stable to save tsma result in dstVgId
|
// create stable to save tsma result in dstVgId
|
||||||
|
SName stbFullName = {0};
|
||||||
|
tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
SVCreateStbReq pReq = {0};
|
SVCreateStbReq pReq = {0};
|
||||||
pReq.name = pCfg->dstTbName;
|
pReq.name = (char*)tNameGetTableName(&stbFullName);
|
||||||
pReq.suid = pCfg->dstTbUid;
|
pReq.suid = pCfg->dstTbUid;
|
||||||
pReq.schemaRow = pCfg->schemaRow;
|
pReq.schemaRow = pCfg->schemaRow;
|
||||||
pReq.schemaTag = pCfg->schemaTag;
|
pReq.schemaTag = pCfg->schemaTag;
|
||||||
|
@ -125,6 +127,12 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
||||||
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
|
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
|
||||||
|
" dstTb:%s dstVg:%d",
|
||||||
|
SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -351,7 +351,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
// TODO: remove the function
|
// TODO: remove the function
|
||||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
// TODO
|
// TODO
|
||||||
|
|
||||||
blockDebugShowDataBlocks(data, __func__);
|
blockDebugShowDataBlocks(data, __func__);
|
||||||
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||||
}
|
}
|
||||||
|
@ -852,6 +851,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
|
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
|
||||||
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||||
submitBlkRsp.code = terrno;
|
submitBlkRsp.code = terrno;
|
||||||
|
pRsp->code = terrno;
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
taosArrayDestroy(createTbReq.ctb.tagName);
|
taosArrayDestroy(createTbReq.ctb.tagName);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
|
Loading…
Reference in New Issue