Merge pull request #21296 from taosdata/fix/liaohj_main
refactor: do some internal refactor.
This commit is contained in:
commit
bd50d6a763
|
@ -2742,7 +2742,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
|
||||||
// varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
// varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
||||||
//
|
//
|
||||||
// SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
|
||||||
//
|
//
|
||||||
// char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
// char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
// tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
|
// tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
|
||||||
|
@ -2750,29 +2750,29 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
|
||||||
// varDataSetLen(db, strlen(varDataVal(db)));
|
// varDataSetLen(db, strlen(varDataVal(db)));
|
||||||
//
|
//
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataAppend(pColInfo, numOfRows, (const char *)db, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
|
||||||
//
|
//
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
|
||||||
//
|
//
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
|
||||||
//
|
//
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
|
||||||
//
|
//
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
|
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
|
||||||
//
|
//
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// if (pStb->commentLen > 0) {
|
// if (pStb->commentLen > 0) {
|
||||||
// char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
// char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
// STR_TO_VARSTR(comment, pStb->comment);
|
// STR_TO_VARSTR(comment, pStb->comment);
|
||||||
// colDataAppend(pColInfo, numOfRows, comment, false);
|
// colDataSetVal(pColInfo, numOfRows, comment, false);
|
||||||
// } else if (pStb->commentLen == 0) {
|
// } else if (pStb->commentLen == 0) {
|
||||||
// char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
|
// char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
|
||||||
// STR_TO_VARSTR(comment, "");
|
// STR_TO_VARSTR(comment, "");
|
||||||
// colDataAppend(pColInfo, numOfRows, comment, false);
|
// colDataSetVal(pColInfo, numOfRows, comment, false);
|
||||||
// } else {
|
// } else {
|
||||||
// colDataSetNULL(pColInfo, numOfRows);
|
// colDataSetNULL(pColInfo, numOfRows);
|
||||||
// }
|
// }
|
||||||
|
@ -2782,14 +2782,14 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
|
||||||
// varDataSetLen(watermark, strlen(varDataVal(watermark)));
|
// varDataSetLen(watermark, strlen(varDataVal(watermark)));
|
||||||
//
|
//
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataAppend(pColInfo, numOfRows, (const char *)watermark, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false);
|
||||||
//
|
//
|
||||||
// char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
|
// char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
|
||||||
// sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
|
// sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
|
||||||
// varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
|
// varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
|
||||||
//
|
//
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false);
|
||||||
//
|
//
|
||||||
// char rollup[160 + VARSTR_HEADER_SIZE] = {0};
|
// char rollup[160 + VARSTR_HEADER_SIZE] = {0};
|
||||||
// int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
|
// int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
|
||||||
|
@ -2808,7 +2808,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
|
||||||
// varDataSetLen(rollup, strlen(varDataVal(rollup)));
|
// varDataSetLen(rollup, strlen(varDataVal(rollup)));
|
||||||
//
|
//
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
// colDataAppend(pColInfo, numOfRows, (const char *)rollup, false);
|
// colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false);
|
||||||
//
|
//
|
||||||
// numOfRows++;
|
// numOfRows++;
|
||||||
// sdbRelease(pSdb, pStb);
|
// sdbRelease(pSdb, pStb);
|
||||||
|
@ -3067,20 +3067,20 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p
|
||||||
for (int32_t j = 0; j < pm->colNum; j++) {
|
for (int32_t j = 0; j < pm->colNum; j++) {
|
||||||
// table name
|
// table name
|
||||||
SColumnInfoData *pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
SColumnInfoData *pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
||||||
colDataAppend(pColInfoData, numOfRows, tName, false);
|
colDataSetVal(pColInfoData, numOfRows, tName, false);
|
||||||
|
|
||||||
// database name
|
// database name
|
||||||
pColInfoData = taosArrayGet(p->pDataBlock, 1);
|
pColInfoData = taosArrayGet(p->pDataBlock, 1);
|
||||||
colDataAppend(pColInfoData, numOfRows, dName, false);
|
colDataSetVal(pColInfoData, numOfRows, dName, false);
|
||||||
|
|
||||||
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||||
colDataAppend(pColInfoData, numOfRows, typeName, false);
|
colDataSetVal(pColInfoData, numOfRows, typeName, false);
|
||||||
|
|
||||||
// col name
|
// col name
|
||||||
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(colName, pm->schema[j].name);
|
STR_TO_VARSTR(colName, pm->schema[j].name);
|
||||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||||
colDataAppend(pColInfoData, numOfRows, colName, false);
|
colDataSetVal(pColInfoData, numOfRows, colName, false);
|
||||||
|
|
||||||
// col type
|
// col type
|
||||||
int8_t colType = pm->schema[j].type;
|
int8_t colType = pm->schema[j].type;
|
||||||
|
@ -3095,10 +3095,10 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p
|
||||||
(int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
(int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||||
}
|
}
|
||||||
varDataSetLen(colTypeStr, colTypeLen);
|
varDataSetLen(colTypeStr, colTypeLen);
|
||||||
colDataAppend(pColInfoData, numOfRows, (char *)colTypeStr, false);
|
colDataSetVal(pColInfoData, numOfRows, (char *)colTypeStr, false);
|
||||||
|
|
||||||
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||||
colDataAppend(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false);
|
colDataSetVal(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false);
|
||||||
for (int32_t k = 6; k <= 8; ++k) {
|
for (int32_t k = 6; k <= 8; ++k) {
|
||||||
pColInfoData = taosArrayGet(p->pDataBlock, k);
|
pColInfoData = taosArrayGet(p->pDataBlock, k);
|
||||||
colDataSetNULL(pColInfoData, numOfRows);
|
colDataSetNULL(pColInfoData, numOfRows);
|
||||||
|
@ -3192,19 +3192,19 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
for (int i = 0; i < pStb->numOfColumns; i++) {
|
for (int i = 0; i < pStb->numOfColumns; i++) {
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)db, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, typeName, false);
|
colDataSetVal(pColInfo, numOfRows, typeName, false);
|
||||||
|
|
||||||
// col name
|
// col name
|
||||||
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
|
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, colName, false);
|
colDataSetVal(pColInfo, numOfRows, colName, false);
|
||||||
|
|
||||||
// col type
|
// col type
|
||||||
int8_t colType = pStb->pColumns[i].type;
|
int8_t colType = pStb->pColumns[i].type;
|
||||||
|
@ -3219,10 +3219,10 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||||
}
|
}
|
||||||
varDataSetLen(colTypeStr, colTypeLen);
|
varDataSetLen(colTypeStr, colTypeLen);
|
||||||
colDataAppend(pColInfo, numOfRows, (char *)colTypeStr, false);
|
colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false);
|
||||||
while (cols < pShow->numOfColumns) {
|
while (cols < pShow->numOfColumns) {
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetNULL(pColInfo, numOfRows);
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
|
|
|
@ -503,6 +503,74 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildResSDataBlock(SSDataBlock* pBlock, SSchemaWrapper* pSchema, const SArray* pColIdList) {
|
||||||
|
if (blockDataGetNumOfCols(pBlock) > 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfCols = taosArrayGetSize(pColIdList);
|
||||||
|
|
||||||
|
if (numOfCols == 0) { // all columns are required
|
||||||
|
for (int32_t i = 0; i < pSchema->nCols; ++i) {
|
||||||
|
SSchema* pColSchema = &pSchema->pSchema[i];
|
||||||
|
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
||||||
|
|
||||||
|
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
blockDataFreeRes(pBlock);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (numOfCols > pSchema->nCols) {
|
||||||
|
numOfCols = pSchema->nCols;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
|
int32_t j = 0;
|
||||||
|
while (i < pSchema->nCols && j < numOfCols) {
|
||||||
|
SSchema* pColSchema = &pSchema->pSchema[i];
|
||||||
|
col_id_t colIdSchema = pColSchema->colId;
|
||||||
|
|
||||||
|
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pColIdList, j);
|
||||||
|
if (colIdSchema < colIdNeed) {
|
||||||
|
i++;
|
||||||
|
} else if (colIdSchema > colIdNeed) {
|
||||||
|
j++;
|
||||||
|
} else {
|
||||||
|
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
||||||
|
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
j++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if (IS_STR_DATA_TYPE(pColVal->type)) {
|
||||||
|
char val[65535 + 2] = {0};
|
||||||
|
if (pColVal->value.pData != NULL) {
|
||||||
|
memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
|
||||||
|
varDataSetLen(val, pColVal->value.nData);
|
||||||
|
code = colDataSetVal(pColumnInfoData, rowIndex, val, !COL_VAL_IS_VALUE(pColVal));
|
||||||
|
} else {
|
||||||
|
colDataSetNULL(pColumnInfoData, rowIndex);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
|
int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
|
||||||
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
|
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
|
||||||
|
|
||||||
|
@ -538,53 +606,11 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet
|
||||||
pReader->cachedSchemaSuid = suid;
|
pReader->cachedSchemaSuid = suid;
|
||||||
pReader->cachedSchemaVer = sversion;
|
pReader->cachedSchemaVer = sversion;
|
||||||
|
|
||||||
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
|
ASSERT(pReader->cachedSchemaVer == pReader->pSchemaWrapper->version);
|
||||||
if (blockDataGetNumOfCols(pBlock) > 0) {
|
if (blockDataGetNumOfCols(pBlock) == 0) {
|
||||||
blockDataDestroy(pReader->pResBlock);
|
int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
|
||||||
pReader->pResBlock = createDataBlock();
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pBlock = pReader->pResBlock;
|
return code;
|
||||||
|
|
||||||
pBlock->info.id.uid = uid;
|
|
||||||
pBlock->info.version = pReader->msg.ver;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfCols = taosArrayGetSize(pReader->pColIdList);
|
|
||||||
if (numOfCols == 0) { // all columns are required
|
|
||||||
for (int32_t i = 0; i < pSchemaWrapper->nCols; ++i) {
|
|
||||||
SSchema* pColSchema = &pSchemaWrapper->pSchema[i];
|
|
||||||
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
|
||||||
|
|
||||||
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
blockDataFreeRes(pBlock);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (numOfCols > pSchemaWrapper->nCols) {
|
|
||||||
numOfCols = pSchemaWrapper->nCols;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t i = 0;
|
|
||||||
int32_t j = 0;
|
|
||||||
while (i < pSchemaWrapper->nCols && j < numOfCols) {
|
|
||||||
SSchema* pColSchema = &pSchemaWrapper->pSchema[i];
|
|
||||||
col_id_t colIdSchema = pColSchema->colId;
|
|
||||||
|
|
||||||
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
|
|
||||||
if (colIdSchema < colIdNeed) {
|
|
||||||
i++;
|
|
||||||
} else if (colIdSchema > colIdNeed) {
|
|
||||||
j++;
|
|
||||||
} else {
|
|
||||||
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
|
||||||
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
i++;
|
|
||||||
j++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -632,30 +658,15 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet
|
||||||
} else if (pCol->cid == pColData->info.colId) {
|
} else if (pCol->cid == pColData->info.colId) {
|
||||||
for (int32_t i = 0; i < pCol->nVal; i++) {
|
for (int32_t i = 0; i < pCol->nVal; i++) {
|
||||||
tColDataGetValue(pCol, i, &colVal);
|
tColDataGetValue(pCol, i, &colVal);
|
||||||
if (IS_STR_DATA_TYPE(colVal.type)) {
|
int32_t code = doSetVal(pColData, i, &colVal);
|
||||||
if (colVal.value.pData != NULL) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
char val[65535 + 2] = {0};
|
return code;
|
||||||
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
|
||||||
varDataSetLen(val, colVal.value.nData);
|
|
||||||
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
colDataSetNULL(pColData, i);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sourceIdx++;
|
sourceIdx++;
|
||||||
targetIdx++;
|
targetIdx++;
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 0; i < pCol->nVal; i++) {
|
colDataSetNNULL(pColData, 0, pCol->nVal);
|
||||||
colDataSetNULL(pColData, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
targetIdx++;
|
targetIdx++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -681,21 +692,9 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet
|
||||||
sourceIdx++;
|
sourceIdx++;
|
||||||
continue;
|
continue;
|
||||||
} else if (colVal.cid == pColData->info.colId) {
|
} else if (colVal.cid == pColData->info.colId) {
|
||||||
if (IS_STR_DATA_TYPE(colVal.type)) {
|
int32_t code = doSetVal(pColData, i, &colVal);
|
||||||
if (colVal.value.pData != NULL) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
char val[65535 + 2] = {0};
|
return code;
|
||||||
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
|
||||||
varDataSetLen(val, colVal.value.nData);
|
|
||||||
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
colDataSetNULL(pColData, i);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sourceIdx++;
|
sourceIdx++;
|
||||||
|
@ -833,14 +832,14 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
||||||
char val[65535 + 2];
|
char val[65535 + 2];
|
||||||
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
||||||
varDataSetLen(val, colVal.value.nData);
|
varDataSetLen(val, colVal.value.nData);
|
||||||
if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
colDataSetNULL(pColData, curRow - lastRow);
|
colDataSetNULL(pColData, curRow - lastRow);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -930,14 +929,14 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
||||||
char val[65535 + 2];
|
char val[65535 + 2];
|
||||||
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
|
||||||
varDataSetLen(val, colVal.value.nData);
|
varDataSetLen(val, colVal.value.nData);
|
||||||
if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
colDataSetNULL(pColData, curRow - lastRow);
|
colDataSetNULL(pColData, curRow - lastRow);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue