rewrite tq read handle
This commit is contained in:
parent
8b1e51d910
commit
9225155119
|
@ -91,7 +91,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, int16_t* pGroupId, int32_t* pNumOfRows) {
|
||||||
/*int32_t sversion = pHandle->pBlock->sversion;*/
|
/*int32_t sversion = pHandle->pBlock->sversion;*/
|
||||||
// TODO set to real sversion
|
// TODO set to real sversion
|
||||||
int32_t sversion = 0;
|
int32_t sversion = 0;
|
||||||
|
@ -112,7 +112,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
STSchema* pTschema = pHandle->pSchema;
|
STSchema* pTschema = pHandle->pSchema;
|
||||||
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
|
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
|
||||||
|
|
||||||
int32_t numOfRows = pHandle->pBlock->numOfRows;
|
*pNumOfRows = pHandle->pBlock->numOfRows;
|
||||||
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
|
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
|
||||||
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
||||||
|
|
||||||
|
@ -120,10 +120,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
colNumNeed = pSchemaWrapper->nCols;
|
colNumNeed = pSchemaWrapper->nCols;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
|
*ppCols = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
|
||||||
if (pArray == NULL) {
|
if (*ppCols == NULL) {
|
||||||
return NULL;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t colMeta = 0;
|
int32_t colMeta = 0;
|
||||||
int32_t colNeed = 0;
|
int32_t colNeed = 0;
|
||||||
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
|
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
|
||||||
|
@ -136,21 +137,24 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
colNeed++;
|
colNeed++;
|
||||||
} else {
|
} else {
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
/*int sz = numOfRows * pColSchema->bytes;*/
|
|
||||||
colInfo.info.bytes = pColSchema->bytes;
|
colInfo.info.bytes = pColSchema->bytes;
|
||||||
colInfo.info.colId = pColSchema->colId;
|
colInfo.info.colId = pColSchema->colId;
|
||||||
colInfo.info.type = pColSchema->type;
|
colInfo.info.type = pColSchema->type;
|
||||||
|
|
||||||
if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) {
|
if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) {
|
||||||
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
|
goto FAIL;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
taosArrayPush(pArray, &colInfo);
|
taosArrayPush(*ppCols, &colInfo);
|
||||||
colMeta++;
|
colMeta++;
|
||||||
colNeed++;
|
colNeed++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t colActual = taosArrayGetSize(*ppCols);
|
||||||
|
|
||||||
|
// TODO in stream shuffle case, fetch groupId
|
||||||
|
*pGroupId = 0;
|
||||||
|
|
||||||
STSRowIter iter = {0};
|
STSRowIter iter = {0};
|
||||||
tdSTSRowIterInit(&iter, pTschema);
|
tdSTSRowIterInit(&iter, pTschema);
|
||||||
STSRow* row;
|
STSRow* row;
|
||||||
|
@ -159,22 +163,22 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||||
tdSTSRowIterReset(&iter, row);
|
tdSTSRowIterReset(&iter, row);
|
||||||
// get all wanted col of that block
|
// get all wanted col of that block
|
||||||
int32_t colTot = taosArrayGetSize(pArray);
|
for (int32_t i = 0; i < colActual; i++) {
|
||||||
for (int32_t i = 0; i < colTot; i++) {
|
SColumnInfoData* pColData = taosArrayGet(*ppCols, i);
|
||||||
SColumnInfoData* pColData = taosArrayGet(pArray, i);
|
|
||||||
SCellVal sVal = {0};
|
SCellVal sVal = {0};
|
||||||
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
|
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
/*if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {*/
|
|
||||||
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
|
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
|
||||||
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
|
goto FAIL;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
curRow++;
|
curRow++;
|
||||||
}
|
}
|
||||||
return pArray;
|
return 0;
|
||||||
|
FAIL:
|
||||||
|
taosArrayDestroy(*ppCols);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
||||||
|
|
Loading…
Reference in New Issue