fix: support writing streams to existing tables

This commit is contained in:
Xiaoyu Wang 2023-01-18 09:30:54 +08:00
parent 8d26b18cfd
commit caf3de2959
3 changed files with 43 additions and 12 deletions

View File

@ -1789,7 +1789,7 @@ typedef struct {
// 3.0.2.3 // 3.0.2.3
int8_t createStb; int8_t createStb;
uint64_t targetStbUid; uint64_t targetStbUid;
SArray* fillNullCols; SArray* fillNullCols; // array of SColLocation
} SCMCreateStreamReq; } SCMCreateStreamReq;
typedef struct { typedef struct {

View File

@ -5428,6 +5428,13 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
} }
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1; if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1; if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1;
if (tEncodeI32(&encoder, taosArrayGetSize(pReq->fillNullCols)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pReq->fillNullCols); ++i) {
SColLocation *pCol = taosArrayGet(pReq->fillNullCols, i);
if (tEncodeI16(&encoder, pCol->slotId) < 0) return -1;
if (tEncodeI16(&encoder, pCol->colId) < 0) return -1;
if (tEncodeI8(&encoder, pCol->type) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
@ -5490,6 +5497,26 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
} }
if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1; if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1; if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1;
int32_t numOfFillNullCols = 0;
if (tDecodeI32(&decoder, &numOfFillNullCols) < 0) return -1;
if (numOfFillNullCols > 0) {
pReq->fillNullCols = taosArrayInit(numOfFillNullCols, sizeof(SColLocation));
if (pReq->fillNullCols == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfFillNullCols; ++i) {
SColLocation col = {0};
if (tDecodeI16(&decoder, &col.slotId) < 0) return -1;
if (tDecodeI16(&decoder, &col.colId) < 0) return -1;
if (tDecodeI8(&decoder, &col.type) < 0) return -1;
if (taosArrayPush(pReq->fillNullCols, &col) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
}
tEndDecode(&decoder); tEndDecode(&decoder);
@ -5559,6 +5586,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosArrayDestroy(pReq->pTags); taosArrayDestroy(pReq->pTags);
taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->ast); taosMemoryFreeClear(pReq->ast);
taosArrayDestroy(pReq->fillNullCols);
} }
int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) { int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {

View File

@ -2210,7 +2210,8 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
} }
static bool sysTableFromVnode(const char* pTable) { static bool sysTableFromVnode(const char* pTable) {
return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) || (0 == strcmp(pTable, TSDB_INS_TABLE_COLS))); return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_COLS)));
} }
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); } static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
@ -2278,7 +2279,8 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true; ((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
} }
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) || if (TSDB_CODE_SUCCESS == code &&
(0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) ||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS)) { 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &pVgs); code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &pVgs);
} }
@ -5804,19 +5806,20 @@ static int32_t setFillNullCols(SArray* pProjColPos, const STableMeta* pMeta, SCM
if (NULL == pReq->fillNullCols) { if (NULL == pReq->fillNullCols) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t indexOfSchema = 0;
const SSchema* pSchemas = getTableColumnSchema(pMeta); const SSchema* pSchemas = getTableColumnSchema(pMeta);
for (int32_t i = 0; i < numOfBoundCols; ++i) { int32_t indexOfBoundCols = 0;
SProjColPos* pPos = taosArrayGet(pProjColPos, i); for (int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) {
while (indexOfSchema < pMeta->tableInfo.numOfColumns) { const SSchema* pSchema = pSchemas + i;
const SSchema* pSchema = pSchemas + indexOfSchema++; if (indexOfBoundCols < numOfBoundCols) {
SProjColPos* pPos = taosArrayGet(pProjColPos, indexOfBoundCols);
if (pSchema->colId == pPos->colId) { if (pSchema->colId == pPos->colId) {
break; ++indexOfBoundCols;
continue;
} }
SColLocation colLoc = {.colId = pSchema->colId, .slotId = indexOfSchema - 1, .type = pSchema->type}; }
SColLocation colLoc = {.colId = pSchema->colId, .slotId = i, .type = pSchema->type};
taosArrayPush(pReq->fillNullCols, &colLoc); taosArrayPush(pReq->fillNullCols, &colLoc);
} }
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }