add tq support for submitblk scanner
This commit is contained in:
parent
1de23b8b98
commit
b6e826df6c
|
@ -521,30 +521,30 @@ TEST(testCase, show_stable_Test) {
|
||||||
// taosHashCleanup(phash);
|
// taosHashCleanup(phash);
|
||||||
//}
|
//}
|
||||||
//
|
//
|
||||||
//TEST(testCase, create_topic_Test) {
|
TEST(testCase, create_topic_Test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
// assert(pConn != NULL);
|
assert(pConn != NULL);
|
||||||
//
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
// if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||||
// }
|
}
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
//
|
|
||||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
// ASSERT_TRUE(pFields == nullptr);
|
ASSERT_TRUE(pFields == nullptr);
|
||||||
//
|
|
||||||
// int32_t numOfFields = taos_num_fields(pRes);
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
// ASSERT_EQ(numOfFields, 0);
|
ASSERT_EQ(numOfFields, 0);
|
||||||
//
|
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
//
|
|
||||||
// char* sql = "select * from tu";
|
char* sql = "select * from tu";
|
||||||
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
|
||||||
// taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
// taos_close(pConn);
|
taos_close(pConn);
|
||||||
//}
|
}
|
||||||
//
|
|
||||||
//TEST(testCase, insert_test) {
|
//TEST(testCase, insert_test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
// ASSERT_NE(pConn, nullptr);
|
// ASSERT_NE(pConn, nullptr);
|
||||||
|
|
|
@ -74,6 +74,15 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
|
int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->logicalPlan)+1, TOPIC_ENCODE_OVER);
|
||||||
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
|
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
|
||||||
|
pTopic->physicalPlan = calloc(physicalPlanLen, sizeof(char));
|
||||||
|
if (pTopic->physicalPlan == NULL) goto TOPIC_ENCODE_OVER;
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER);
|
||||||
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
|
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
|
||||||
|
@ -83,6 +92,12 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
TOPIC_ENCODE_OVER:
|
TOPIC_ENCODE_OVER:
|
||||||
if (terrno != TSDB_CODE_SUCCESS) {
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
|
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
|
||||||
|
/*if (pTopic->logicalPlan) {*/
|
||||||
|
/*free(pTopic->logicalPlan);*/
|
||||||
|
/*}*/
|
||||||
|
/*if (pTopic->physicalPlan) {*/
|
||||||
|
/*free(pTopic->physicalPlan);*/
|
||||||
|
/*}*/
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -121,10 +136,23 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
|
|
||||||
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
|
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
|
||||||
// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
|
||||||
// SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||||
// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
pTopic->logicalPlan = calloc(len+1, sizeof(char));
|
||||||
// SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
|
if (pTopic->logicalPlan == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto TOPIC_DECODE_OVER;
|
||||||
|
}
|
||||||
|
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||||
|
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
|
||||||
|
if (pTopic->physicalPlan == NULL) {
|
||||||
|
free(pTopic->logicalPlan);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto TOPIC_DECODE_OVER;
|
||||||
|
}
|
||||||
|
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "meta.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -314,6 +315,26 @@ const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
|
||||||
|
|
||||||
static int tqQueryExecuting(int32_t status) { return status; }
|
static int tqQueryExecuting(int32_t status) { return status; }
|
||||||
|
|
||||||
|
typedef struct STqReadHandle {
|
||||||
|
int64_t ver;
|
||||||
|
SSubmitMsg* pMsg;
|
||||||
|
SSubmitBlk* pBlock;
|
||||||
|
SSubmitMsgIter msgIter;
|
||||||
|
SSubmitBlkIter blkIter;
|
||||||
|
SMeta* pMeta;
|
||||||
|
} STqReadHandle;
|
||||||
|
|
||||||
|
typedef struct SSubmitBlkScanInfo {
|
||||||
|
|
||||||
|
} SSubmitBlkScanInfo;
|
||||||
|
|
||||||
|
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg);
|
||||||
|
bool tqNextDataBlock(STqReadHandle* pHandle);
|
||||||
|
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo *pBlockInfo);
|
||||||
|
//return SArray<SColumnInfoData>
|
||||||
|
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
|
||||||
|
//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -607,3 +607,70 @@ int tqItemSSize() {
|
||||||
// mainly for executor
|
// mainly for executor
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
|
||||||
|
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
|
||||||
|
if (pReadHandle == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pReadHandle->pMeta = pMeta;
|
||||||
|
pReadHandle->pMsg = pMsg;
|
||||||
|
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
|
||||||
|
pReadHandle->ver = -1;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
|
if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
|
||||||
|
SMemRow row;
|
||||||
|
int32_t sversion = pHandle->pBlock->sversion;
|
||||||
|
SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);
|
||||||
|
pBlockInfo->numOfCols = pSchema->nCols;
|
||||||
|
pBlockInfo->rows = pHandle->pBlock->numOfRows;
|
||||||
|
pBlockInfo->uid = pHandle->pBlock->uid;
|
||||||
|
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||||
|
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
|
||||||
|
int32_t sversion = pHandle->pBlock->sversion;
|
||||||
|
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
|
||||||
|
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
|
||||||
|
SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
||||||
|
if (pArray == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
SColumnInfoData colInfo;
|
||||||
|
int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
|
||||||
|
colInfo.pData = malloc(sz);
|
||||||
|
if (colInfo.pData == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < pTschema->numOfCols; i++) {
|
||||||
|
taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
|
||||||
|
}
|
||||||
|
|
||||||
|
SMemRow row;
|
||||||
|
int32_t kvIdx;
|
||||||
|
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||||
|
for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
|
||||||
|
STColumn *pCol = schemaColAt(pTschema, i);
|
||||||
|
void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
|
||||||
|
//TODO: handle varlen
|
||||||
|
memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosArrayPush(pArray, &colInfo);
|
||||||
|
return pArray;
|
||||||
|
}
|
||||||
|
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/
|
||||||
|
/*return 0;*/
|
||||||
|
/*}*/
|
||||||
|
|
Loading…
Reference in New Issue