Merge branch 'refact/submit_req' of https://github.com/taosdata/TDengine into refact/submit_req
This commit is contained in:
commit
6bb16fc1e7
|
@ -87,7 +87,7 @@ void qCleanupKeywordsTable();
|
|||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
|
||||
int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf);
|
||||
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset);
|
||||
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, int32_t vgId, bool rebuildCreateTb);
|
||||
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, bool rebuildCreateTb);
|
||||
void qDestroyStmtDataBlock(STableDataCxt* pBlock);
|
||||
STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock);
|
||||
int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData);
|
||||
|
|
|
@ -240,6 +240,8 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
|
|||
}
|
||||
|
||||
int32_t stmtParseSql(STscStmt* pStmt) {
|
||||
pStmt->exec.pCurrBlock = NULL;
|
||||
|
||||
SStmtCallback stmtCb = {
|
||||
.pStmt = pStmt,
|
||||
.getTbNameFn = stmtGetTbName,
|
||||
|
@ -352,7 +354,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid) {
|
||||
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid, uint64_t suid) {
|
||||
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
SVgroupInfo vgInfo = {0};
|
||||
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
||||
|
@ -364,7 +366,9 @@ int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableD
|
|||
STMT_ERR_RET(
|
||||
taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
|
||||
|
||||
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId, pStmt->sql.autoCreateTbl));
|
||||
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgInfo.vgId, pStmt->sql.autoCreateTbl));
|
||||
|
||||
STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgInfo.vgId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -410,7 +414,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
pStmt->bInfo.tbUid = 0;
|
||||
|
||||
STableDataCxt* pNewBlock = NULL;
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0));
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid));
|
||||
|
||||
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
|
||||
POINTER_BYTES)) {
|
||||
|
@ -490,7 +494,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
pStmt->bInfo.tagsCached = true;
|
||||
|
||||
STableDataCxt* pNewBlock = NULL;
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid));
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid));
|
||||
|
||||
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
|
||||
POINTER_BYTES)) {
|
||||
|
@ -596,8 +600,6 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
|
|||
STMT_ERR_RET(stmtGetFromCache(pStmt));
|
||||
|
||||
if (pStmt->bInfo.needParse) {
|
||||
pStmt->exec.pCurrBlock = NULL;
|
||||
|
||||
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
|
||||
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
|
||||
|
||||
|
|
|
@ -857,6 +857,7 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
|
|||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
#if 1
|
||||
int32_t code = 0;
|
||||
terrno = 0;
|
||||
|
||||
SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
|
||||
SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
|
||||
|
@ -892,6 +893,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL);
|
||||
if (code) {
|
||||
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||
vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), pSubmitTbData->uid);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -1018,6 +1020,8 @@ _exit:
|
|||
tDestroySSubmitReq2(pSubmitReq, TSDB_MSG_FLG_DECODE);
|
||||
tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
|
||||
|
||||
if (code) terrno = code;
|
||||
|
||||
return code;
|
||||
|
||||
#else
|
||||
|
|
|
@ -467,7 +467,7 @@ int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool rese
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, int32_t vgId, bool rebuildCreateTb) {
|
||||
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, bool rebuildCreateTb) {
|
||||
int32_t code = qCloneStmtDataBlock(pDst, pSrc, false);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -477,8 +477,12 @@ int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_
|
|||
if (pBlock->pMeta) {
|
||||
pBlock->pMeta->uid = uid;
|
||||
pBlock->pMeta->vgId = vgId;
|
||||
pBlock->pMeta->suid = suid;
|
||||
}
|
||||
|
||||
pBlock->pData->suid = suid;
|
||||
pBlock->pData->uid = uid;
|
||||
|
||||
if (rebuildCreateTb && NULL == pBlock->pData->pCreateTbReq) {
|
||||
pBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||
if (NULL == pBlock->pData->pCreateTbReq) {
|
||||
|
|
|
@ -1074,6 +1074,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
|||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pOutput = pTableCxt;
|
||||
qDebug("tableDataCxt created, uid:%" PRId64 ", vgId:%d", pTableMeta->uid, pTableMeta->vgId);
|
||||
} else {
|
||||
taosMemoryFree(pTableCxt);
|
||||
}
|
||||
|
@ -1182,6 +1183,8 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx
|
|||
taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData);
|
||||
taosMemoryFreeClear(pTableCxt->pData);
|
||||
|
||||
qDebug("add tableDataCxt uid:%" PRId64 " to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue