refactor: do some internal refactor.
This commit is contained in:
parent
85afbf0d35
commit
66477a28ca
|
@ -42,7 +42,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
void* tqReader;
|
||||
void* meta;
|
||||
// void* meta;
|
||||
void* config;
|
||||
void* vnode;
|
||||
void* mnd;
|
||||
|
@ -54,7 +54,7 @@ typedef struct {
|
|||
int32_t numOfVgroups;
|
||||
void* sContext; // SSnapContext*
|
||||
|
||||
void* pStateBackend;
|
||||
void* pStateBackend;
|
||||
struct SStorageAPI api;
|
||||
} SReadHandle;
|
||||
|
||||
|
|
|
@ -243,6 +243,7 @@ typedef struct TsdReader {
|
|||
void (*tsdReaderNotifyClosing)();
|
||||
} TsdReader;
|
||||
|
||||
|
||||
/**
|
||||
* int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
|
||||
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
|
||||
|
@ -262,7 +263,6 @@ typedef struct SStoreCacheReader {
|
|||
|
||||
/*------------------------------------------------------------------------------------------------------------------*/
|
||||
/*
|
||||
*
|
||||
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
|
||||
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
|
||||
|
@ -552,7 +552,7 @@ typedef struct SStateStore {
|
|||
|
||||
typedef struct SStorageAPI {
|
||||
SStoreMeta metaFn; // todo: refactor
|
||||
TsdReader tsdReader;
|
||||
TsdReader tsdReader;
|
||||
SStoreMetaReader metaReaderFn;
|
||||
SStoreCacheReader cacheFn;
|
||||
SStoreSnapshotFn snapshotFn;
|
||||
|
|
|
@ -19,23 +19,23 @@
|
|||
#include "vnode.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) {
|
||||
int32_t code = 0;
|
||||
|
@ -301,32 +301,32 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
|
||||
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg *pRsp) {
|
||||
void *ptr = NULL;
|
||||
void *pReq;
|
||||
int32_t len;
|
||||
int32_t ret;
|
||||
|
||||
if (version <= pVnode->state.applied) {
|
||||
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
|
||||
if (ver <= pVnode->state.applied) {
|
||||
vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver,
|
||||
pVnode->state.applied);
|
||||
terrno = TSDB_CODE_VND_DUP_REQUEST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
||||
version);
|
||||
ver);
|
||||
|
||||
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
|
||||
ASSERT(pVnode->state.applied + 1 == version);
|
||||
ASSERT(pVnode->state.applied + 1 == ver);
|
||||
|
||||
atomic_store_64(&pVnode->state.applied, version);
|
||||
atomic_store_64(&pVnode->state.applied, ver);
|
||||
atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
|
||||
|
||||
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
|
||||
|
||||
if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) {
|
||||
if (tqCheckLogInWal(pVnode->pTq, version)) return 0;
|
||||
if (tqCheckLogInWal(pVnode->pTq, ver)) return 0;
|
||||
}
|
||||
|
||||
// skip header
|
||||
|
@ -337,123 +337,123 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
switch (pMsg->msgType) {
|
||||
/* META */
|
||||
case TDMT_VND_CREATE_STB:
|
||||
if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessCreateStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_ALTER_STB:
|
||||
if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessAlterStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_DROP_STB:
|
||||
if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessDropStbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_CREATE_TABLE:
|
||||
if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessCreateTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_ALTER_TABLE:
|
||||
if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessAlterTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_DROP_TABLE:
|
||||
if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessDropTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_DROP_TTL_TABLE:
|
||||
if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessDropTtlTbReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_TRIM:
|
||||
if (vnodeProcessTrimReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_CREATE_SMA:
|
||||
if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessCreateTSmaReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
/* TSDB */
|
||||
case TDMT_VND_SUBMIT:
|
||||
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessSubmitReq(pVnode, ver, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_DELETE:
|
||||
if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_BATCH_DEL:
|
||||
if (vnodeProcessBatchDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessBatchDeleteReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
/* TQ */
|
||||
case TDMT_VND_TMQ_SUBSCRIBE:
|
||||
if (tqProcessSubscribeReq(pVnode->pTq, version, pReq, len) < 0) {
|
||||
if (tqProcessSubscribeReq(pVnode->pTq, ver, pReq, len) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_TMQ_DELETE_SUB:
|
||||
if (tqProcessDeleteSubReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
if (tqProcessDeleteSubReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_TMQ_COMMIT_OFFSET:
|
||||
if (tqProcessOffsetCommitReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_TMQ_SEEK_TO_OFFSET:
|
||||
if (tqProcessSeekReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
if (tqProcessSeekReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_TMQ_ADD_CHECKINFO:
|
||||
if (tqProcessAddCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) {
|
||||
if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_TMQ_DEL_CHECKINFO:
|
||||
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) {
|
||||
if (tqProcessDelCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
case TDMT_STREAM_TASK_DEPLOY: {
|
||||
if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, version, pReq, len) < 0) {
|
||||
if (pVnode->restored && tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
} break;
|
||||
case TDMT_STREAM_TASK_DROP: {
|
||||
if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
if (tqProcessTaskDropReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
} break;
|
||||
case TDMT_STREAM_TASK_PAUSE: {
|
||||
if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
} break;
|
||||
case TDMT_STREAM_TASK_RESUME: {
|
||||
if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
} break;
|
||||
case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
|
||||
if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
} break;
|
||||
case TDMT_STREAM_TASK_CHECK_RSP: {
|
||||
if (tqProcessStreamTaskCheckRsp(pVnode->pTq, version, pReq, len) < 0) {
|
||||
if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
} break;
|
||||
case TDMT_VND_ALTER_CONFIRM:
|
||||
needCommit = pVnode->config.hashChange;
|
||||
if (vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp) < 0) {
|
||||
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_ALTER_CONFIG:
|
||||
vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
|
||||
vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp);
|
||||
break;
|
||||
case TDMT_VND_COMMIT:
|
||||
needCommit = true;
|
||||
break;
|
||||
case TDMT_VND_CREATE_INDEX:
|
||||
vnodeProcessCreateIndexReq(pVnode, version, pReq, len, pRsp);
|
||||
vnodeProcessCreateIndexReq(pVnode, ver, pReq, len, pRsp);
|
||||
break;
|
||||
case TDMT_VND_DROP_INDEX:
|
||||
vnodeProcessDropIndexReq(pVnode, version, pReq, len, pRsp);
|
||||
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
|
||||
break;
|
||||
case TDMT_VND_COMPACT:
|
||||
vnodeProcessCompactVnodeReq(pVnode, version, pReq, len, pRsp);
|
||||
vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
|
||||
goto _exit;
|
||||
default:
|
||||
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
|
||||
|
@ -461,18 +461,18 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
}
|
||||
|
||||
vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
|
||||
version);
|
||||
ver);
|
||||
|
||||
walApplyVer(pVnode->pWal, version);
|
||||
walApplyVer(pVnode->pWal, ver);
|
||||
|
||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
|
||||
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
// commit if need
|
||||
if (needCommit) {
|
||||
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
|
||||
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver);
|
||||
if (vnodeAsyncCommit(pVnode) < 0) {
|
||||
vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
|
@ -489,8 +489,8 @@ _exit:
|
|||
return 0;
|
||||
|
||||
_err:
|
||||
vError("vgId:%d, process %s request failed since %s, version:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
||||
tstrerror(terrno), version);
|
||||
vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
||||
tstrerror(terrno), ver);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -514,7 +514,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||
SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_SCH_QUERY:
|
||||
case TDMT_SCH_MERGE_QUERY:
|
||||
|
@ -603,7 +603,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
|||
}
|
||||
|
||||
extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now);
|
||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = 0;
|
||||
SVTrimDbReq trimReq = {0};
|
||||
|
||||
|
@ -624,7 +624,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
|
||||
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -650,7 +650,7 @@ end:
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SVCreateStbReq req = {0};
|
||||
SDecoder coder;
|
||||
|
||||
|
@ -667,7 +667,7 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
|
||||
if (metaCreateSTable(pVnode->pMeta, ver, &req) < 0) {
|
||||
pRsp->code = terrno;
|
||||
goto _err;
|
||||
}
|
||||
|
@ -685,7 +685,7 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SDecoder decoder = {0};
|
||||
SEncoder encoder = {0};
|
||||
int32_t rcode = 0;
|
||||
|
@ -742,7 +742,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
|
|||
}
|
||||
|
||||
// do create table
|
||||
if (metaCreateTable(pVnode->pMeta, version, pCreateReq, &cRsp.pMeta) < 0) {
|
||||
if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) {
|
||||
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||
cRsp.code = TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
|
@ -790,7 +790,7 @@ _exit:
|
|||
return rcode;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SVCreateStbReq req = {0};
|
||||
SDecoder dc = {0};
|
||||
|
||||
|
@ -808,7 +808,7 @@ static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pR
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
|
||||
if (metaAlterSTable(pVnode->pMeta, ver, &req) < 0) {
|
||||
pRsp->code = terrno;
|
||||
tDecoderClear(&dc);
|
||||
return -1;
|
||||
|
@ -819,7 +819,7 @@ static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pR
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SVDropStbReq req = {0};
|
||||
int32_t rcode = TSDB_CODE_SUCCESS;
|
||||
SDecoder decoder = {0};
|
||||
|
@ -839,7 +839,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
|
|||
// process request
|
||||
tbUidList = taosArrayInit(8, sizeof(int64_t));
|
||||
if (tbUidList == NULL) goto _exit;
|
||||
if (metaDropSTable(pVnode->pMeta, version, &req, tbUidList) < 0) {
|
||||
if (metaDropSTable(pVnode->pMeta, ver, &req, tbUidList) < 0) {
|
||||
rcode = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
@ -862,7 +862,7 @@ _exit:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SVAlterTbReq vAlterTbReq = {0};
|
||||
SVAlterTbRsp vAlterTbRsp = {0};
|
||||
SDecoder dc = {0};
|
||||
|
@ -887,7 +887,7 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pRe
|
|||
}
|
||||
|
||||
// process
|
||||
if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
|
||||
if (metaAlterTable(pVnode->pMeta, ver, &vAlterTbReq, &vMetaRsp) < 0) {
|
||||
vAlterTbRsp.code = terrno;
|
||||
tDecoderClear(&dc);
|
||||
rcode = -1;
|
||||
|
@ -912,7 +912,7 @@ _exit:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SVDropTbBatchReq req = {0};
|
||||
SVDropTbBatchRsp rsp = {0};
|
||||
SDecoder decoder = {0};
|
||||
|
@ -946,7 +946,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
tb_uid_t tbUid = 0;
|
||||
|
||||
/* code */
|
||||
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids, &tbUid);
|
||||
ret = metaDropTable(pVnode->pMeta, ver, pDropTbReq, tbUids, &tbUid);
|
||||
if (ret < 0) {
|
||||
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
|
||||
dropTbRsp.code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1189,7 +1189,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = 0;
|
||||
terrno = 0;
|
||||
|
||||
|
@ -1203,7 +1203,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
|
||||
void *pAllocMsg = NULL;
|
||||
SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
|
||||
if (0 == pMsg->version) {
|
||||
if (0 == pMsg->ver) {
|
||||
code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
|
||||
|
@ -1251,7 +1251,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) {
|
||||
if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), version);
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
@ -1263,7 +1263,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
|
||||
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), version);
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
@ -1350,7 +1350,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);
|
||||
|
||||
// create table
|
||||
if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
|
||||
if (metaCreateTable(pVnode->pMeta, ver, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
|
||||
// create table success
|
||||
|
||||
if (newTbUids == NULL &&
|
||||
|
@ -1376,7 +1376,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
|
||||
// insert data
|
||||
int32_t affectedRows;
|
||||
code = tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, &affectedRows);
|
||||
code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
|
||||
if (code) goto _exit;
|
||||
|
||||
pSubmitRsp->affectedRows += affectedRows;
|
||||
|
@ -1404,7 +1404,7 @@ _exit:
|
|||
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
||||
if (code == 0) {
|
||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
|
||||
tdProcessRSmaSubmit(pVnode->pSma, version, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
|
||||
tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
|
||||
}
|
||||
|
||||
// clear
|
||||
|
@ -1419,7 +1419,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SVCreateTSmaReq req = {0};
|
||||
SDecoder coder = {0};
|
||||
|
||||
|
@ -1439,20 +1439,20 @@ static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
|
||||
if (tdProcessTSmaCreate(pVnode->pSma, ver, (const char *)&req) < 0) {
|
||||
if (pRsp) pRsp->code = terrno;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tDecoderClear(&coder);
|
||||
vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
|
||||
req.indexName, req.indexUid, version, req.tableUid);
|
||||
req.indexName, req.indexUid, ver, req.tableUid);
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
tDecoderClear(&coder);
|
||||
vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
|
||||
TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr());
|
||||
TD_VID(pVnode), req.indexName, req.indexUid, ver, req.tableUid, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1468,28 +1468,28 @@ int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
|
|||
return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
|
||||
}
|
||||
|
||||
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t version) {
|
||||
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
|
||||
pVnode->config.hashBegin, pVnode->config.hashEnd, version);
|
||||
pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
|
||||
|
||||
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode));
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (!pVnode->config.hashChange) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
code = vnodeConsolidateAlterHashRange(pVnode, version);
|
||||
code = vnodeConsolidateAlterHashRange(pVnode, ver);
|
||||
if (code < 0) {
|
||||
vError("vgId:%d, failed to consolidate alter hashrange since %s. version:%" PRId64, TD_VID(pVnode), terrstr(),
|
||||
version);
|
||||
ver);
|
||||
goto _exit;
|
||||
}
|
||||
pVnode->config.hashChange = false;
|
||||
|
@ -1503,7 +1503,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
bool walChanged = false;
|
||||
bool tsdbChanged = false;
|
||||
|
||||
|
@ -1606,7 +1606,7 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SBatchDeleteReq deleteReq;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, pReq, len);
|
||||
|
@ -1626,7 +1626,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
|
|||
|
||||
int64_t uid = mr.me.uid;
|
||||
|
||||
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
||||
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
||||
if (code < 0) {
|
||||
terrno = code;
|
||||
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
|
||||
|
@ -1640,7 +1640,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = 0;
|
||||
SDecoder *pCoder = &(SDecoder){0};
|
||||
SDeleteRes *pRes = &(SDeleteRes){0};
|
||||
|
@ -1661,7 +1661,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
|
||||
|
||||
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
|
||||
code = tsdbDeleteTableData(pVnode->pTsdb, version, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
|
||||
code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid),
|
||||
pRes->skey, pRes->ekey);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
@ -1682,7 +1682,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
_err:
|
||||
return code;
|
||||
}
|
||||
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SVCreateStbReq req = {0};
|
||||
SDecoder dc = {0};
|
||||
|
||||
|
@ -1698,7 +1698,7 @@ static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void
|
|||
tDecoderClear(&dc);
|
||||
return -1;
|
||||
}
|
||||
if (metaAddIndexToSTable(pVnode->pMeta, version, &req) < 0) {
|
||||
if (metaAddIndexToSTable(pVnode->pMeta, ver, &req) < 0) {
|
||||
pRsp->code = terrno;
|
||||
goto _err;
|
||||
}
|
||||
|
@ -1708,7 +1708,7 @@ _err:
|
|||
tDecoderClear(&dc);
|
||||
return -1;
|
||||
}
|
||||
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SDropIndexReq req = {0};
|
||||
pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
|
||||
pRsp->code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1720,21 +1720,21 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (metaDropIndexFromSTable(pVnode->pMeta, version, &req) < 0) {
|
||||
if (metaDropIndexFromSTable(pVnode->pMeta, ver, &req) < 0) {
|
||||
pRsp->code = terrno;
|
||||
return -1;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
return vnodeProcessCompactVnodeReqImpl(pVnode, version, pReq, len, pRsp);
|
||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
|
||||
}
|
||||
|
||||
#ifndef TD_ENTERPRISE
|
||||
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -2055,7 +2055,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
|||
pTableListInfo->numOfOuputGroups = 1;
|
||||
}
|
||||
} else {
|
||||
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo, digest, pAPI);
|
||||
code = getColInfoResultForGroupby(pHandle->vnode, group, pTableListInfo, digest, pAPI);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -340,7 +340,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
|||
|
||||
// let's discard the tables those are not created according to the queried super table.
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.meta, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0);
|
||||
for (int32_t i = 0; i < numOfUids; ++i) {
|
||||
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
|
||||
|
||||
|
@ -372,7 +372,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
|||
if (pScanInfo->pTagCond != NULL) {
|
||||
bool qualified = false;
|
||||
STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
|
||||
code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified, pAPI);
|
||||
code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
|
||||
continue;
|
||||
|
@ -437,7 +437,7 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
|
|||
if (assignUid) {
|
||||
keyInfo.groupId = keyInfo.uid;
|
||||
} else {
|
||||
code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
|
||||
code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
|
||||
&keyInfo.groupId, &pTaskInfo->storageAPI);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(keyBuf);
|
||||
|
|
|
@ -122,7 +122,7 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
|
|||
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
pAPI->metaReaderFn.initReader(&mr, pHandle->meta, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
int32_t code = pAPI->metaReaderFn.readerGetEntryGetUidCache(&mr, pScanNode->uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
|
||||
|
|
|
@ -531,7 +531,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
|
||||
// 1. check if it is existed in meta cache
|
||||
if (pCache == NULL) {
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->meta, 0);
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
code = pHandle->api.metaReaderFn.readerGetEntryGetUidCache(&mr, pBlock->info.id.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
|
||||
|
@ -560,7 +560,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
|
||||
h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
|
||||
if (h == NULL) {
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->meta, 0);
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0);
|
||||
code = pHandle->api.metaReaderFn.readerGetEntryGetUidCache(&mr, pBlock->info.id.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
|
@ -2556,7 +2556,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
|||
char str[512] = {0};
|
||||
int32_t count = 0;
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.meta, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0);
|
||||
|
||||
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
||||
doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
|
||||
|
@ -3396,7 +3396,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca
|
|||
pRes->info.id.groupId = groupId;
|
||||
|
||||
int64_t dbTableCount = 0;
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.meta, &dbTableCount);
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode, &dbTableCount);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
@ -3410,18 +3410,18 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO
|
|||
if (strlen(pSupp->dbNameFilter) != 0) {
|
||||
if (strlen(pSupp->stbNameFilter) != 0) {
|
||||
tb_uid_t uid = 0;
|
||||
pAPI->metaFn.getTableUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter, &uid);
|
||||
pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
|
||||
SMetaStbStats stats = {0};
|
||||
ASSERT(0);
|
||||
// metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
|
||||
// metaGetStbStats(pInfo->readHandle.vnode, uid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
|
||||
} else {
|
||||
int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.meta);
|
||||
int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
|
||||
}
|
||||
} else {
|
||||
int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.meta);
|
||||
int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode);
|
||||
pAPI->metaFn.storeGetBasicInfo(pInfo->readHandle.vnode);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
|
||||
}
|
||||
|
@ -3437,7 +3437,7 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, S
|
|||
|
||||
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
|
||||
pRes->info.id.groupId = groupId;
|
||||
int64_t ntbNum = 0;//metaGetNtbNum(pInfo->readHandle.meta);
|
||||
int64_t ntbNum = 0;//metaGetNtbNum(pInfo->readHandle.vnode);
|
||||
ASSERT(0);
|
||||
if (ntbNum != 0) {
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
|
||||
|
@ -3448,7 +3448,7 @@ static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, S
|
|||
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
|
||||
char stbName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
ASSERT(0);
|
||||
// metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);
|
||||
// metaGetTableSzNameByUid(pInfo->readHandle.vnode, stbUid, stbName);
|
||||
|
||||
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
if (pSupp->groupByDbName) {
|
||||
|
@ -3461,7 +3461,7 @@ static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, S
|
|||
pRes->info.id.groupId = groupId;
|
||||
|
||||
SMetaStbStats stats = {0};
|
||||
// metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
|
||||
// metaGetStbStats(pInfo->readHandle.vnode, stbUid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
|
||||
fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
|
||||
|
|
|
@ -466,7 +466,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
STR_TO_VARSTR(tableName, pInfo->req.filterTb);
|
||||
|
||||
SMetaReader smrTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.meta, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0);
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrTable, pInfo->req.filterTb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
|
||||
|
@ -486,7 +486,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
if (smrTable.me.type == TSDB_CHILD_TABLE) {
|
||||
int64_t suid = smrTable.me.ctbEntry.suid;
|
||||
pAPI->metaReaderFn.clearReader(&smrTable);
|
||||
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.meta, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, 0);
|
||||
code = pAPI->metaReaderFn.getTableEntryByUid(&smrTable, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
|
||||
|
@ -522,7 +522,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t ret = 0;
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.meta);
|
||||
pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode);
|
||||
}
|
||||
|
||||
if (pInfo->pSchema == NULL) {
|
||||
|
@ -569,7 +569,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
|||
schemaRow = *(SSchemaWrapper**)schema;
|
||||
} else {
|
||||
SMetaReader smrSuperTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.meta, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0);
|
||||
int code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
|
||||
|
@ -658,7 +658,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
STR_TO_VARSTR(tableName, condTableName);
|
||||
|
||||
SMetaReader smrChildTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.meta, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrChildTable, pInfo->readHandle.vnode, 0);
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrChildTable, condTableName);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
|
||||
|
@ -676,7 +676,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
SMetaReader smrSuperTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.meta, META_READER_NOLOCK);
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_NOLOCK);
|
||||
code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, smrChildTable.me.ctbEntry.suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno has been set by pAPI->metaReaderFn.getTableEntryByUid
|
||||
|
@ -702,7 +702,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t ret = 0;
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.meta);
|
||||
pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode);
|
||||
}
|
||||
|
||||
bool blockFull = false;
|
||||
|
@ -715,7 +715,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
|||
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
|
||||
|
||||
SMetaReader smrSuperTable = {0};
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.meta, 0);
|
||||
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, 0);
|
||||
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1131,7 +1131,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
tb_uid_t* uid = taosArrayGet(pIdx->uids, i);
|
||||
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.meta, 0);
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0);
|
||||
ret = pAPI->metaReaderFn.getTableEntryByUid(&mr, *uid);
|
||||
if (ret < 0) {
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
|
@ -1159,7 +1159,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
|
||||
|
||||
SMetaReader mr1 = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr1, pInfo->readHandle.meta, META_READER_NOLOCK);
|
||||
pAPI->metaReaderFn.initReader(&mr1, pInfo->readHandle.vnode, META_READER_NOLOCK);
|
||||
|
||||
int64_t suid = mr.me.ctbEntry.suid;
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr1, suid);
|
||||
|
@ -1292,7 +1292,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
|
||||
SSysTableScanInfo* pInfo = pOperator->info;
|
||||
if (pInfo->pCur == NULL) {
|
||||
pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.meta);
|
||||
pInfo->pCur = pAPI->metaFn.openMetaCursor(pInfo->readHandle.vnode);
|
||||
}
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
@ -1338,7 +1338,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
|
||||
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.meta, META_READER_NOLOCK);
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK);
|
||||
|
||||
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid);
|
||||
|
@ -1486,7 +1486,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
|||
} else {
|
||||
if (pInfo->showRewrite == false) {
|
||||
if (pCondition != NULL && pInfo->pIdx == NULL) {
|
||||
SSTabFltArg arg = {.pMeta = pInfo->readHandle.meta, .pVnode = pInfo->readHandle.vnode};
|
||||
SSTabFltArg arg = {.pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode};
|
||||
|
||||
SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex));
|
||||
idx->init = 0;
|
||||
|
|
|
@ -440,11 +440,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
int64_t rId = msg.refId;
|
||||
int32_t eId = msg.execId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node,
|
||||
.msg = msg.msg,
|
||||
.msgLen = msg.msgLen,
|
||||
.connInfo = pMsg->info,
|
||||
.msgType = pMsg->msgType};
|
||||
SQWMsg qwMsg = {.node = node, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info, .msgType = pMsg->msgType};
|
||||
qwMsg.msgInfo.explain = msg.explain;
|
||||
qwMsg.msgInfo.taskType = msg.taskType;
|
||||
qwMsg.msgInfo.needFetch = msg.needFetch;
|
||||
|
|
Loading…
Reference in New Issue