Merge pull request #14390 from taosdata/feature/TD-11274-3.0
refactor: assign the committed version
This commit is contained in:
commit
8cf9f9c71a
|
@ -113,7 +113,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
if (!pDataBlocks) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_PTR;
|
||||
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma));
|
||||
return terrno;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pDataBlocks) <= 0) {
|
||||
|
@ -129,7 +129,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
|
||||
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
||||
SSmaStat *pStat = NULL;
|
||||
STSmaStat *pItem = NULL;
|
||||
STSmaStat *pTsmaStat = NULL;
|
||||
|
||||
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_STAT;
|
||||
|
@ -137,32 +137,43 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
}
|
||||
|
||||
tdRefSmaStat(pSma, pStat);
|
||||
pItem = &pStat->tsmaStat;
|
||||
pTsmaStat = SMA_TSMA_STAT(pStat);
|
||||
|
||||
ASSERT(pItem);
|
||||
|
||||
if (!pItem->pTSma) {
|
||||
if (!pTsmaStat->pTSma) {
|
||||
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
|
||||
if (!pTSma) {
|
||||
terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META;
|
||||
smaWarn("vgId:%d, tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno));
|
||||
return TSDB_CODE_FAILED;
|
||||
smaError("vgId:%d, failed to get STSma while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
||||
indexUid, tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
pTsmaStat->pTSma = pTSma;
|
||||
pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1);
|
||||
if (!pTsmaStat->pTSchema) {
|
||||
smaError("vgId:%d, failed to get STSchema while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
||||
indexUid, tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
pItem->pTSma = pTSma;
|
||||
pItem->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1);
|
||||
ASSERT(pItem->pTSchema); // TODO
|
||||
}
|
||||
|
||||
ASSERT(pItem->pTSma->indexUid == indexUid);
|
||||
if (pTsmaStat->pTSma->indexUid != indexUid) {
|
||||
terrno = TSDB_CODE_VND_APP_ERROR;
|
||||
smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 "(!=%" PRIi64 ") failed since %s", SMA_VID(pSma), indexUid,
|
||||
pTsmaStat->pTSma->indexUid, tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SSubmitReq *pSubmitReq = NULL;
|
||||
SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
|
||||
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId);
|
||||
|
||||
pSubmitReq = tdBlockToSubmit((const SArray *)msg, pItem->pTSchema, true, pItem->pTSma->dstTbUid,
|
||||
pItem->pTSma->dstTbName, pItem->pTSma->dstVgId);
|
||||
if (!pSubmitReq) {
|
||||
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
||||
indexUid, tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(pSubmitReq); // TODO
|
||||
|
||||
ASSERT(!strncasecmp("td.tsma.rst.tb", pItem->pTSma->dstTbName, 14));
|
||||
#if 0
|
||||
ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14));
|
||||
#endif
|
||||
|
||||
SRpcMsg submitReqMsg = {
|
||||
.msgType = TDMT_VND_SUBMIT,
|
||||
|
@ -170,9 +181,15 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
.contLen = ntohl(pSubmitReq->length),
|
||||
};
|
||||
|
||||
ASSERT(tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) == 0);
|
||||
if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) {
|
||||
smaError("vgId:%d, failed to put SubmitReq msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
||||
indexUid, tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
|
@ -267,6 +267,8 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
pVnode->state.committed = info.state.committed;
|
||||
|
||||
// apply the commit (TODO)
|
||||
vnodeBufPoolReset(pVnode->onCommit);
|
||||
pVnode->onCommit->next = pVnode->pPool;
|
||||
|
|
Loading…
Reference in New Issue