From caf3de29591fee19a526686aeff5dc8fc3dc279d Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 18 Jan 2023 09:30:54 +0800 Subject: [PATCH] fix: support writing streams to existing tables --- include/common/tmsg.h | 2 +- source/common/src/tmsg.c | 28 ++++++++++++++++++++++++++ source/libs/parser/src/parTranslater.c | 25 +++++++++++++---------- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2fcfd4ec0b..c5e9a12756 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1789,7 +1789,7 @@ typedef struct { // 3.0.2.3 int8_t createStb; uint64_t targetStbUid; - SArray* fillNullCols; + SArray* fillNullCols; // array of SColLocation } SCMCreateStreamReq; typedef struct { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 87ad592afb..20bb265e78 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5428,6 +5428,13 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS } if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1; if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1; + if (tEncodeI32(&encoder, taosArrayGetSize(pReq->fillNullCols)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pReq->fillNullCols); ++i) { + SColLocation *pCol = taosArrayGet(pReq->fillNullCols, i); + if (tEncodeI16(&encoder, pCol->slotId) < 0) return -1; + if (tEncodeI16(&encoder, pCol->colId) < 0) return -1; + if (tEncodeI8(&encoder, pCol->type) < 0) return -1; + } tEndEncode(&encoder); @@ -5490,6 +5497,26 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea } if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1; if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1; + int32_t numOfFillNullCols = 0; + if (tDecodeI32(&decoder, &numOfFillNullCols) < 0) return -1; + if (numOfFillNullCols > 0) { + pReq->fillNullCols = taosArrayInit(numOfFillNullCols, sizeof(SColLocation)); + if (pReq->fillNullCols == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < numOfFillNullCols; ++i) { + SColLocation col = {0}; + if (tDecodeI16(&decoder, &col.slotId) < 0) return -1; + if (tDecodeI16(&decoder, &col.colId) < 0) return -1; + if (tDecodeI8(&decoder, &col.type) < 0) return -1; + if (taosArrayPush(pReq->fillNullCols, &col) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + } tEndDecode(&decoder); @@ -5559,6 +5586,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { taosArrayDestroy(pReq->pTags); taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->ast); + taosArrayDestroy(pReq->fillNullCols); } int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f6a2b0a025..395f90d070 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2210,7 +2210,8 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) { } static bool sysTableFromVnode(const char* pTable) { - return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) || (0 == strcmp(pTable, TSDB_INS_TABLE_COLS))); + return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) || + (0 == strcmp(pTable, TSDB_INS_TABLE_COLS))); } static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); } @@ -2278,8 +2279,9 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, ((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true; } - if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) || - 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS)) { + if (TSDB_CODE_SUCCESS == code && + (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) || + 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS)) { code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &pVgs); } @@ -5804,18 +5806,19 @@ static int32_t setFillNullCols(SArray* pProjColPos, const STableMeta* pMeta, SCM if (NULL == pReq->fillNullCols) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t indexOfSchema = 0; const SSchema* pSchemas = getTableColumnSchema(pMeta); - for (int32_t i = 0; i < numOfBoundCols; ++i) { - SProjColPos* pPos = taosArrayGet(pProjColPos, i); - while (indexOfSchema < pMeta->tableInfo.numOfColumns) { - const SSchema* pSchema = pSchemas + indexOfSchema++; + int32_t indexOfBoundCols = 0; + for (int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) { + const SSchema* pSchema = pSchemas + i; + if (indexOfBoundCols < numOfBoundCols) { + SProjColPos* pPos = taosArrayGet(pProjColPos, indexOfBoundCols); if (pSchema->colId == pPos->colId) { - break; + ++indexOfBoundCols; + continue; } - SColLocation colLoc = {.colId = pSchema->colId, .slotId = indexOfSchema - 1, .type = pSchema->type}; - taosArrayPush(pReq->fillNullCols, &colLoc); } + SColLocation colLoc = {.colId = pSchema->colId, .slotId = i, .type = pSchema->type}; + taosArrayPush(pReq->fillNullCols, &colLoc); } return TSDB_CODE_SUCCESS; }