update table meta based on query sversion

This commit is contained in:
dapan1121 2022-05-19 22:08:06 +08:00
parent 9ef37af528
commit d472c28142
9 changed files with 86 additions and 11 deletions

View File

@ -1352,6 +1352,9 @@ typedef struct {
typedef struct { typedef struct {
int32_t code; int32_t code;
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t sversion;
int32_t tversion;
} SResReadyRsp; } SResReadyRsp;
typedef struct { typedef struct {

View File

@ -57,6 +57,12 @@ typedef struct SIndexMeta {
} SIndexMeta; } SIndexMeta;
typedef struct STbVerInfo {
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t sversion;
int32_t tversion;
} STbVerInfo;
/* /*
* ASSERT(sizeof(SCTableMeta) == 24) * ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE) * ASSERT(tableType == TSDB_CHILD_TABLE)

View File

@ -344,7 +344,23 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
taosArrayPush(pArray, &tbSver); taosArrayPush(pArray, &tbSver);
} }
} else if (TDMT_VND_QUERY == pRequest->type) { } else if (TDMT_VND_QUERY == pRequest->type) {
SArray* pTbArray = (SArray*)res;
int32_t tbNum = taosArrayGetSize(pTbArray);
if (tbNum <= 0) {
return TSDB_CODE_SUCCESS;
}
pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
if (NULL == pArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < tbNum; ++i) {
STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion};
taosArrayPush(pArray, &tbSver);
}
} }
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
@ -369,7 +385,7 @@ void freeRequestRes(SRequestObj* pRequest, void* res) {
if (TDMT_VND_SUBMIT == pRequest->type) { if (TDMT_VND_SUBMIT == pRequest->type) {
tFreeSSubmitRsp((SSubmitRsp*)res); tFreeSSubmitRsp((SSubmitRsp*)res);
} else if (TDMT_VND_QUERY == pRequest->type) { } else if (TDMT_VND_QUERY == pRequest->type) {
taosArrayDestroy((SArray *)res);
} }
} }

View File

@ -177,8 +177,16 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab
*sversion = pTaskInfo->schemaVer.sversion; *sversion = pTaskInfo->schemaVer.sversion;
*tversion = pTaskInfo->schemaVer.tversion; *tversion = pTaskInfo->schemaVer.tversion;
strcpy(dbName, pTaskInfo->schemaVer.dbname); if (pTaskInfo->schemaVer.dbname) {
strcpy(tableName, pTaskInfo->schemaVer.tablename); strcpy(dbName, pTaskInfo->schemaVer.dbname);
} else {
dbName[0] = 0;
}
if (pTaskInfo->schemaVer.tablename) {
strcpy(tableName, pTaskInfo->schemaVer.tablename);
} else {
tableName[0] = 0;
}
return 0; return 0;
} }

View File

@ -131,6 +131,7 @@ typedef struct SQWTaskCtx {
void *taskHandle; void *taskHandle;
void *sinkHandle; void *sinkHandle;
SSubplan *plan; SSubplan *plan;
STbVerInfo tbInfo;
} SQWTaskCtx; } SQWTaskCtx;
typedef struct SQWSchStatus { typedef struct SQWSchStatus {

View File

@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
int32_t code); int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
void qwFreeFetchRsp(void *msg); void qwFreeFetchRsp(void *msg);

View File

@ -718,6 +718,16 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
}
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
@ -899,6 +909,11 @@ _return:
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
} }
if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code, ctx ? &ctx->tbInfo : NULL);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
}
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
@ -910,11 +925,6 @@ _return:
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
}
if (code) { if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
} }
@ -975,6 +985,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
atomic_store_ptr(&ctx->sinkHandle, sinkHandle); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
if (pTaskInfo && sinkHandle) { if (pTaskInfo && sinkHandle) {
qwSaveTbVersionInfo(pTaskInfo, ctx);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
} }
@ -1047,7 +1058,7 @@ _return:
} }
if (needRsp) { if (needRsp) {
qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); qwBuildAndSendReadyRsp(&qwMsg->connInfo, code, NULL);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }

View File

@ -63,9 +63,14 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code) { int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code; pRsp->code = code;
if (tbInfo) {
strcpy(pRsp->tbFName, tbInfo->tbFName);
pRsp->sversion = tbInfo->sversion;
pRsp->tversion = tbInfo->tversion;
}
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP, .msgType = TDMT_VND_RES_READY_RSP,

View File

@ -1070,6 +1070,27 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) {
if (rsp->tbFName[0]) {
if (NULL == pJob->resData) {
pJob->resData = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
if (NULL == pJob->resData) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
STbVerInfo tbInfo;
strcpy(tbInfo.tbFName, rsp->tbFName);
tbInfo.sversion = rsp->sversion;
tbInfo.tversion = rsp->tversion;
taosArrayPush((SArray *)pJob->resData, &tbInfo);
}
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal // Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
int32_t rspCode) { int32_t rspCode) {
@ -1225,6 +1246,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
pJob->resType = SCH_RES_TYPE_QUERY;
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;