add case
This commit is contained in:
parent
e832deed15
commit
09bef18578
|
@ -275,6 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pMsg->pCont = NULL;
|
pMsg->pCont = NULL;
|
||||||
pMsg->contLen = 0;
|
pMsg->contLen = 0;
|
||||||
pMsg->code = -1;
|
pMsg->code = -1;
|
||||||
|
ASSERT(0);
|
||||||
rpcSendResponse(pMsg);
|
rpcSendResponse(pMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -356,6 +357,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
void* buf = rpcMallocCont(tlen);
|
void* buf = rpcMallocCont(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
pMsg->code = -1;
|
pMsg->code = -1;
|
||||||
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
||||||
|
|
|
@ -142,15 +142,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
colInfo.info.colId = pColSchema->colId;
|
colInfo.info.colId = pColSchema->colId;
|
||||||
colInfo.info.type = pColSchema->type;
|
colInfo.info.type = pColSchema->type;
|
||||||
|
|
||||||
#if 0
|
|
||||||
colInfo.pData = taosMemoryCalloc(1, sz);
|
|
||||||
if (colInfo.pData == NULL) {
|
|
||||||
// TODO free
|
|
||||||
taosArrayDestroy(pArray);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (blockDataEnsureColumnCapacity(&colInfo, numOfRows) < 0) {
|
if (blockDataEnsureColumnCapacity(&colInfo, numOfRows) < 0) {
|
||||||
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
|
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -161,39 +152,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int j = 0;
|
|
||||||
for (int32_t i = 0; i < colNumNeed; i++) {
|
|
||||||
col_id_t colId = *(col_id_t*)taosArrayGet(pHandle->pColIdList, i);
|
|
||||||
while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
|
|
||||||
j++;
|
|
||||||
}
|
|
||||||
if (j >= pSchemaWrapper->nCols) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
|
|
||||||
SColumnInfoData colInfo = {0};
|
|
||||||
int sz = numOfRows * pColSchema->bytes;
|
|
||||||
colInfo.info.bytes = pColSchema->bytes;
|
|
||||||
colInfo.info.colId = colId;
|
|
||||||
colInfo.info.type = pColSchema->type;
|
|
||||||
|
|
||||||
colInfo.pData = taosMemoryCalloc(1, sz);
|
|
||||||
if (colInfo.pData == NULL) {
|
|
||||||
// TODO free
|
|
||||||
taosArrayDestroy(pArray);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
blockDataEnsureColumnCapacity(&colInfo, numOfRows);
|
|
||||||
taosArrayPush(pArray, &colInfo);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
STSRowIter iter = {0};
|
STSRowIter iter = {0};
|
||||||
tdSTSRowIterInit(&iter, pTschema);
|
tdSTSRowIterInit(&iter, pTschema);
|
||||||
STSRow* row;
|
STSRow* row;
|
||||||
// int32_t kvIdx = 0;
|
|
||||||
int32_t curRow = 0;
|
int32_t curRow = 0;
|
||||||
tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
|
tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
|
||||||
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||||
|
@ -206,25 +167,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
|
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
memcpy(POINTER_SHIFT(pColData->pData, curRow * pColData->info.bytes), sVal.val, pColData->info.bytes);
|
// TODO handle null
|
||||||
|
colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL);
|
||||||
}
|
}
|
||||||
#if 0
|
|
||||||
for (int32_t i = 0; i < colNumNeed; i++) {
|
|
||||||
SColumnInfoData* pColData = taosArrayGet(pArray, i);
|
|
||||||
STColumn* pCol = schemaColAt(pTschema, i);
|
|
||||||
// TODO
|
|
||||||
if(pCol->colId != pColData->info.colId) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
|
|
||||||
SCellVal sVal = {0};
|
|
||||||
if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) {
|
|
||||||
// TODO: reach end
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
curRow++;
|
curRow++;
|
||||||
}
|
}
|
||||||
return pArray;
|
return pArray;
|
||||||
|
|
|
@ -140,12 +140,12 @@ if $system_content != @{consume success: 10}@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
||||||
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
||||||
#print cmd result----> $system_content
|
print cmd result----> $system_content
|
||||||
#if $system_content != @{consume success: 10}@ then
|
if $system_content != @{consume success: 10}@ then
|
||||||
# return -1
|
return -1
|
||||||
#endi
|
endi
|
||||||
|
|
||||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
||||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
||||||
|
@ -154,12 +154,12 @@ if $system_content != @{consume success: 20}@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
||||||
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
||||||
#print cmd result----> $system_content
|
print cmd result----> $system_content
|
||||||
#if $system_content != @{consume success: 20}@ then
|
if $system_content != @{consume success: 20}@ then
|
||||||
# return -1
|
return -1
|
||||||
#endi
|
endi
|
||||||
|
|
||||||
print =============== create database , vgroup 4
|
print =============== create database , vgroup 4
|
||||||
$dbNamme = d1
|
$dbNamme = d1
|
||||||
|
|
Loading…
Reference in New Issue