fix:wrong tag value
This commit is contained in:
parent
db2e05b461
commit
05c9d46fac
|
@ -873,7 +873,7 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord
|
||||||
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
|
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
|
||||||
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
|
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
|
||||||
int64_t* pData);
|
int64_t* pData);
|
||||||
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
|
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
|
||||||
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
|
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
|
||||||
|
|
||||||
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
|
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
|
||||||
|
|
|
@ -1259,9 +1259,11 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
||||||
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
||||||
pInfo->srcRowIndex = 0;
|
pInfo->srcRowIndex = 0;
|
||||||
} break;
|
} break;
|
||||||
|
case STREAM_CREATE_CHILD_TABLE: {
|
||||||
|
return pBlock;
|
||||||
|
} break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -985,11 +985,13 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
||||||
return pDest;
|
return pDest;
|
||||||
}
|
}
|
||||||
|
|
||||||
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
|
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
|
||||||
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) {
|
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) {
|
||||||
void* pValue = NULL;
|
void* pValue = NULL;
|
||||||
if (streamStateGetParName(pState, groupId, &pValue) != 0) {
|
if (streamStateGetParName(pState, groupId, &pValue) != 0) {
|
||||||
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
|
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
|
||||||
|
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
||||||
|
pTmpBlock->info.id.groupId = groupId;
|
||||||
if (pTableSup->numOfExprs > 0) {
|
if (pTableSup->numOfExprs > 0) {
|
||||||
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
|
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
|
||||||
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||||
|
@ -999,6 +1001,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
|
||||||
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
||||||
memcpy(tbName, varDataVal(pData), len);
|
memcpy(tbName, varDataVal(pData), len);
|
||||||
streamStatePutParName(pState, groupId, tbName);
|
streamStatePutParName(pState, groupId, tbName);
|
||||||
|
memcpy(pTmpBlock->info.parTbName, tbName, len);
|
||||||
pDestBlock->info.rows--;
|
pDestBlock->info.rows--;
|
||||||
} else {
|
} else {
|
||||||
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||||
|
@ -1013,7 +1016,6 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
|
||||||
|
|
||||||
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
||||||
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
|
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
|
||||||
pDestBlock->info.id.groupId = groupId;
|
|
||||||
pDestBlock->info.rows++;
|
pDestBlock->info.rows++;
|
||||||
blockDataDestroy(pTmpBlock);
|
blockDataDestroy(pTmpBlock);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue