[td-808] build submit block along with table schema.
This commit is contained in:
parent
c64f9e73a2
commit
92d81c703c
|
@ -84,7 +84,7 @@ typedef struct SRetrieveSupport {
|
||||||
SColumnModel * pFinalColModel; // colModel for final result
|
SColumnModel * pFinalColModel; // colModel for final result
|
||||||
SSubqueryState * pState;
|
SSubqueryState * pState;
|
||||||
int32_t subqueryIndex; // index of current vnode in vnode list
|
int32_t subqueryIndex; // index of current vnode in vnode list
|
||||||
SSqlObj * pParentSqlObj;
|
SSqlObj * pParentSql;
|
||||||
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
|
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
|
||||||
uint32_t numOfRetry; // record the number of retry times
|
uint32_t numOfRetry; // record the number of retry times
|
||||||
pthread_mutex_t queryMutex;
|
pthread_mutex_t queryMutex;
|
||||||
|
|
|
@ -36,6 +36,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
|
||||||
|
|
||||||
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
|
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
|
||||||
|
|
||||||
|
int32_t tscHandleInsertRetry(SSqlObj* pSql);
|
||||||
|
|
||||||
void tscBuildResFromSubqueries(SSqlObj *pSql);
|
void tscBuildResFromSubqueries(SSqlObj *pSql);
|
||||||
void **doSetResultRowData(SSqlObj *pSql, bool finalResult);
|
void **doSetResultRowData(SSqlObj *pSql, bool finalResult);
|
||||||
|
|
||||||
|
|
|
@ -213,8 +213,7 @@ typedef struct SQueryInfo {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int command;
|
int command;
|
||||||
uint8_t msgType;
|
uint8_t msgType;
|
||||||
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
|
bool autoCreated; // create table if it is not existed during retrieve table meta in mnode
|
||||||
int8_t dataSourceType; // load data from file or not
|
|
||||||
|
|
||||||
union {
|
union {
|
||||||
int32_t count;
|
int32_t count;
|
||||||
|
@ -222,18 +221,23 @@ typedef struct {
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t insertType;
|
int32_t insertType;
|
||||||
int32_t clauseIndex; // index of multiple subclause query
|
int32_t clauseIndex; // index of multiple subclause query
|
||||||
|
|
||||||
|
char * curSql; // current sql, resume position of sql after parsing paused
|
||||||
int8_t parseFinished;
|
int8_t parseFinished;
|
||||||
|
|
||||||
short numOfCols;
|
short numOfCols;
|
||||||
uint32_t allocSize;
|
uint32_t allocSize;
|
||||||
char * payload;
|
char * payload;
|
||||||
int32_t payloadLen;
|
int32_t payloadLen;
|
||||||
SQueryInfo **pQueryInfo;
|
SQueryInfo **pQueryInfo;
|
||||||
int32_t numOfClause;
|
int32_t numOfClause;
|
||||||
char * curSql; // current sql, resume position of sql after parsing paused
|
|
||||||
void * pTableList; // referred table involved in sql
|
|
||||||
int32_t batchSize; // for parameter ('?') binding and batch processing
|
int32_t batchSize; // for parameter ('?') binding and batch processing
|
||||||
int32_t numOfParams;
|
int32_t numOfParams;
|
||||||
|
|
||||||
|
int8_t dataSourceType; // load data from file or not
|
||||||
|
int8_t submitSchema; // submit block is built with table schema
|
||||||
|
SHashObj *pTableList; // referred table involved in sql
|
||||||
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
|
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
|
||||||
} SSqlCmd;
|
} SSqlCmd;
|
||||||
|
|
||||||
|
|
|
@ -431,6 +431,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tscDebug("%p get tableMeta successfully", pSql);
|
||||||
|
|
||||||
if (pSql->pStream == NULL) {
|
if (pSql->pStream == NULL) {
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||||
|
|
||||||
|
@ -446,20 +448,20 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
assert(code == TSDB_CODE_SUCCESS);
|
assert(code == TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pSql->param != NULL);
|
||||||
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
|
|
||||||
|
|
||||||
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
|
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
|
||||||
SSqlObj * pParObj = trs->pParentSqlObj;
|
SSqlObj * pParObj = trs->pParentSql;
|
||||||
|
|
||||||
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
|
|
||||||
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
|
|
||||||
|
|
||||||
// NOTE: the vgroupInfo for the queried super table must be existed here.
|
// NOTE: the vgroupInfo for the queried super table must be existed here.
|
||||||
assert(pTableMetaInfo->vgroupList != NULL);
|
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
|
||||||
|
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
|
||||||
|
|
||||||
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
goto _error;
|
||||||
} else { // continue to process normal async query
|
} else { // continue to process normal async query
|
||||||
if (pCmd->parseFinished) {
|
if (pCmd->parseFinished) {
|
||||||
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
|
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
|
||||||
|
@ -472,18 +474,41 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
assert(code == TSDB_CODE_SUCCESS);
|
assert(code == TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if failed to process sql, go to error handler
|
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
|
||||||
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated
|
||||||
return;
|
// 2. vnode may need the schema information along with submit block to update its local table schema.
|
||||||
|
if (pCmd->command == TSDB_SQL_INSERT) {
|
||||||
|
tscDebug("%p redo parse sql string to build submit block", pSql);
|
||||||
|
|
||||||
|
pCmd->parseFinished = false;
|
||||||
|
if ((code = tsParseSql(pSql, true)) == TSDB_CODE_SUCCESS) {
|
||||||
|
/*
|
||||||
|
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
|
||||||
|
* and send the required submit block according to index value in supporter to server.
|
||||||
|
*/
|
||||||
|
pSql->fp = pSql->fetchFp; // restore the fp
|
||||||
|
if ((code = tscHandleInsertRetry(pSql)) == TSDB_CODE_SUCCESS) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {// in case of other query type, continue
|
||||||
|
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// // todo update the submit message according to the new table meta
|
|
||||||
// // 1. table uid, 2. ip address
|
goto _error;
|
||||||
// code = tscSendMsgToServer(pSql);
|
|
||||||
// if (code == TSDB_CODE_SUCCESS) return;
|
|
||||||
} else {
|
} else {
|
||||||
tscDebug("%p continue parse sql after get table meta", pSql);
|
tscDebug("%p continue parse sql after get table meta", pSql);
|
||||||
|
|
||||||
code = tsParseSql(pSql, false);
|
code = tsParseSql(pSql, false);
|
||||||
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
|
return;
|
||||||
|
} else if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
|
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
|
@ -492,45 +517,49 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
} else {
|
} else {
|
||||||
assert(code == TSDB_CODE_SUCCESS);
|
assert(code == TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pSql->fp)(pSql->param, pSql, code);
|
(*pSql->fp)(pSql->param, pSql, code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
|
// proceed to invoke the tscDoQuery();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else { // stream computing
|
} else { // stream computing
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
|
||||||
pRes->code = code;
|
|
||||||
|
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
|
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
|
return;
|
||||||
|
} else if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||||
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
|
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
|
||||||
pRes->code = code;
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
|
return;
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
|
} else if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
pSql->res.code = code;
|
|
||||||
tscQueueAsyncRes(pSql);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pSql->pStream) {
|
|
||||||
tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
|
tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
|
||||||
if (!pSql->cmd.parseFinished) {
|
if (!pSql->cmd.parseFinished) {
|
||||||
tsParseSql(pSql, false);
|
tsParseSql(pSql, false);
|
||||||
sem_post(&pSql->rspSem);
|
sem_post(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
} else {
|
|
||||||
tscDebug("%p get tableMeta successfully", pSql);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDoQuery(pSql);
|
tscDoQuery(pSql);
|
||||||
|
return;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pSql->res.code = code;
|
||||||
|
tscQueueAsyncRes(pSql);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -347,8 +347,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
|
||||||
int doProcessSql(SSqlObj *pSql) {
|
int doProcessSql(SSqlObj *pSql) {
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
if (pCmd->command == TSDB_SQL_SELECT ||
|
if (pCmd->command == TSDB_SQL_SELECT ||
|
||||||
pCmd->command == TSDB_SQL_FETCH ||
|
pCmd->command == TSDB_SQL_FETCH ||
|
||||||
pCmd->command == TSDB_SQL_RETRIEVE ||
|
pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||||
|
@ -365,10 +364,13 @@ int doProcessSql(SSqlObj *pSql) {
|
||||||
return pRes->code;
|
return pRes->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tscSendMsgToServer(pSql);
|
int32_t code = tscSendMsgToServer(pSql);
|
||||||
|
|
||||||
|
// NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pRes->code = code;
|
pRes->code = code;
|
||||||
tscQueueAsyncRes(pSql);
|
tscQueueAsyncRes(pSql);
|
||||||
|
return pRes->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
typedef struct SInsertSupporter {
|
typedef struct SInsertSupporter {
|
||||||
SSubqueryState* pState;
|
SSubqueryState* pState;
|
||||||
SSqlObj* pSql;
|
SSqlObj* pSql;
|
||||||
|
int32_t index;
|
||||||
} SInsertSupporter;
|
} SInsertSupporter;
|
||||||
|
|
||||||
static void freeJoinSubqueryObj(SSqlObj* pSql);
|
static void freeJoinSubqueryObj(SSqlObj* pSql);
|
||||||
|
@ -1876,13 +1877,36 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
||||||
|
|
||||||
// release data block data
|
// release data block data
|
||||||
tfree(pState);
|
tfree(pState);
|
||||||
// pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
|
|
||||||
|
|
||||||
// restore user defined fp
|
// restore user defined fp
|
||||||
pParentObj->fp = pParentObj->fetchFp;
|
pParentObj->fp = pParentObj->fetchFp;
|
||||||
|
|
||||||
|
// todo remove this parameter in async callback function definition.
|
||||||
// all data has been sent to vnode, call user function
|
// all data has been sent to vnode, call user function
|
||||||
(*pParentObj->fp)(pParentObj->param, pParentObj, numOfRows);
|
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS)? pParentObj->res.code:pParentObj->res.numOfRows;
|
||||||
|
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* it is a subquery, so after parse the sql string, copy the submit block to payload of itself
|
||||||
|
* @param pSql
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t tscHandleInsertRetry(SSqlObj* pSql) {
|
||||||
|
assert(pSql != NULL && pSql->param != NULL);
|
||||||
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
SSqlRes* pRes = &pSql->res;
|
||||||
|
|
||||||
|
SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param;
|
||||||
|
assert(pSupporter->index < pSupporter->pState->numOfTotal);
|
||||||
|
|
||||||
|
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
|
||||||
|
pRes->code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
|
||||||
|
if (pRes->code != TSDB_CODE_SUCCESS) {
|
||||||
|
return pRes->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
return tscProcessSql(pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
||||||
|
@ -1906,10 +1930,11 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
||||||
|
|
||||||
while(numOfSub < pSql->numOfSubs) {
|
while(numOfSub < pSql->numOfSubs) {
|
||||||
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
|
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
|
||||||
pSupporter->pSql = pSql;
|
pSupporter->pSql = pSql;
|
||||||
pSupporter->pState = pState;
|
pSupporter->pState = pState;
|
||||||
|
pSupporter->index = numOfSub;
|
||||||
SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);//createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter1, TSDB_SQL_INSERT, NULL);
|
|
||||||
|
SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
|
||||||
if (pNew == NULL) {
|
if (pNew == NULL) {
|
||||||
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
|
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -1940,6 +1965,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
||||||
return pRes->code; // free all allocated resource
|
return pRes->code; // free all allocated resource
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||||
|
|
||||||
// use the local variable
|
// use the local variable
|
||||||
for (int32_t j = 0; j < numOfSub; ++j) {
|
for (int32_t j = 0; j < numOfSub; ++j) {
|
||||||
SSqlObj *pSub = pSql->pSubs[j];
|
SSqlObj *pSub = pSql->pSubs[j];
|
||||||
|
@ -1947,7 +1974,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
||||||
tscProcessSql(pSub);
|
tscProcessSql(pSub);
|
||||||
}
|
}
|
||||||
|
|
||||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
|
@ -562,10 +562,8 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
|
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) {
|
||||||
// TODO: optimize this function, handle the case while binary is not presented
|
// TODO: optimize this function, handle the case while binary is not presented
|
||||||
int len = 0;
|
|
||||||
|
|
||||||
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
|
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
|
||||||
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
||||||
SSchema* pSchema = tscGetTableSchema(pTableMeta);
|
SSchema* pSchema = tscGetTableSchema(pTableMeta);
|
||||||
|
@ -575,16 +573,37 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
|
||||||
pDataBlock += sizeof(SSubmitBlk);
|
pDataBlock += sizeof(SSubmitBlk);
|
||||||
|
|
||||||
int32_t flen = 0; // original total length of row
|
int32_t flen = 0; // original total length of row
|
||||||
for (int32_t i = 0; i < tinfo.numOfColumns; ++i) {
|
|
||||||
flen += TYPE_BYTES[pSchema[i].type];
|
// schema needs to be included into the submit data block
|
||||||
|
if (includeSchema) {
|
||||||
|
int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
|
||||||
|
for(int32_t j = 0; j < numOfCols; ++j) {
|
||||||
|
STColumn* pCol = (STColumn*) pDataBlock;
|
||||||
|
pCol->colId = pSchema[j].colId;
|
||||||
|
pCol->type = pSchema[j].type;
|
||||||
|
pCol->bytes = pSchema[j].bytes;
|
||||||
|
pCol->offset = 0;
|
||||||
|
|
||||||
|
pDataBlock += sizeof(STColumn);
|
||||||
|
flen += TYPE_BYTES[pSchema[j].type];
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t schemaSize = sizeof(STColumn) * numOfCols;
|
||||||
|
pBlock->schemaLen = schemaSize;
|
||||||
|
} else {
|
||||||
|
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
||||||
|
flen += TYPE_BYTES[pSchema[j].type];
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock->schemaLen = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
|
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
|
||||||
pBlock->len = 0;
|
pBlock->dataLen = 0;
|
||||||
int32_t numOfRows = htons(pBlock->numOfRows);
|
int32_t numOfRows = htons(pBlock->numOfRows);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
SDataRow trow = (SDataRow)pDataBlock;
|
SDataRow trow = (SDataRow) pDataBlock;
|
||||||
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
|
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
|
||||||
dataRowSetVersion(trow, pTableMeta->sversion);
|
dataRowSetVersion(trow, pTableMeta->sversion);
|
||||||
|
|
||||||
|
@ -595,20 +614,21 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
|
||||||
p += pSchema[j].bytes;
|
p += pSchema[j].bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
// p += pTableDataBlock->rowSize;
|
|
||||||
pDataBlock += dataRowLen(trow);
|
pDataBlock += dataRowLen(trow);
|
||||||
pBlock->len += dataRowLen(trow);
|
pBlock->dataLen += dataRowLen(trow);
|
||||||
}
|
}
|
||||||
|
|
||||||
len = pBlock->len;
|
int32_t len = pBlock->dataLen + pBlock->schemaLen;
|
||||||
pBlock->len = htonl(pBlock->len);
|
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||||
|
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||||
|
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
// the expanded size when a row data is converted to SDataRow format
|
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
||||||
const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
|
const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
|
||||||
|
|
||||||
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||||
|
@ -617,7 +637,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
||||||
size_t total = taosArrayGetSize(pTableDataBlockList);
|
size_t total = taosArrayGetSize(pTableDataBlockList);
|
||||||
for (int32_t i = 0; i < total; ++i) {
|
for (int32_t i = 0; i < total; ++i) {
|
||||||
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
|
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
|
||||||
|
|
||||||
STableDataBlocks* dataBuf = NULL;
|
STableDataBlocks* dataBuf = NULL;
|
||||||
|
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
|
@ -666,16 +685,17 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
||||||
pBlocks->uid = htobe64(pBlocks->uid);
|
pBlocks->uid = htobe64(pBlocks->uid);
|
||||||
pBlocks->sversion = htonl(pBlocks->sversion);
|
pBlocks->sversion = htonl(pBlocks->sversion);
|
||||||
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
||||||
|
pBlocks->schemaLen = 0;
|
||||||
|
|
||||||
// erase the empty space reserved for binary data
|
// erase the empty space reserved for binary data
|
||||||
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
|
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->submitSchema);
|
||||||
assert(finalLen <= len);
|
assert(finalLen <= len);
|
||||||
|
|
||||||
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
||||||
assert(dataBuf->size <= dataBuf->nAllocSize);
|
assert(dataBuf->size <= dataBuf->nAllocSize);
|
||||||
|
|
||||||
// the length does not include the SSubmitBlk structure
|
// the length does not include the SSubmitBlk structure
|
||||||
pBlocks->len = htonl(finalLen);
|
pBlocks->dataLen = htonl(finalLen);
|
||||||
|
|
||||||
dataBuf->numOfTables += 1;
|
dataBuf->numOfTables += 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -192,7 +192,8 @@ typedef struct SSubmitBlk {
|
||||||
int32_t tid; // table id
|
int32_t tid; // table id
|
||||||
int32_t padding; // TODO just for padding here
|
int32_t padding; // TODO just for padding here
|
||||||
int32_t sversion; // data schema version
|
int32_t sversion; // data schema version
|
||||||
int32_t len; // data part length, not including the SSubmitBlk head
|
int32_t dataLen; // data part length, not including the SSubmitBlk head
|
||||||
|
int32_t schemaLen; // schema length, if length is 0, no schema exists
|
||||||
int16_t numOfRows; // total number of rows in current submit block
|
int16_t numOfRows; // total number of rows in current submit block
|
||||||
char data[];
|
char data[];
|
||||||
} SSubmitBlk;
|
} SSubmitBlk;
|
||||||
|
|
|
@ -768,7 +768,8 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
|
||||||
SSubmitBlk *pBlock = pIter->pBlock;
|
SSubmitBlk *pBlock = pIter->pBlock;
|
||||||
if (pBlock == NULL) return NULL;
|
if (pBlock == NULL) return NULL;
|
||||||
|
|
||||||
pBlock->len = htonl(pBlock->len);
|
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||||
|
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
pBlock->numOfRows = htons(pBlock->numOfRows);
|
||||||
pBlock->uid = htobe64(pBlock->uid);
|
pBlock->uid = htobe64(pBlock->uid);
|
||||||
pBlock->tid = htonl(pBlock->tid);
|
pBlock->tid = htonl(pBlock->tid);
|
||||||
|
@ -776,11 +777,11 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
|
||||||
pBlock->sversion = htonl(pBlock->sversion);
|
pBlock->sversion = htonl(pBlock->sversion);
|
||||||
pBlock->padding = htonl(pBlock->padding);
|
pBlock->padding = htonl(pBlock->padding);
|
||||||
|
|
||||||
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len;
|
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->dataLen;
|
||||||
if (pIter->len >= pIter->totalLen) {
|
if (pIter->len >= pIter->totalLen) {
|
||||||
pIter->pBlock = NULL;
|
pIter->pBlock = NULL;
|
||||||
} else {
|
} else {
|
||||||
pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->len + sizeof(SSubmitBlk));
|
pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->dataLen + sizeof(SSubmitBlk));
|
||||||
}
|
}
|
||||||
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
|
@ -832,10 +833,10 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||||
if (pBlock->len <= 0) return -1;
|
if (pBlock->dataLen <= 0) return -1;
|
||||||
pIter->totalLen = pBlock->len;
|
pIter->totalLen = pBlock->dataLen;
|
||||||
pIter->len = 0;
|
pIter->len = 0;
|
||||||
pIter->row = (SDataRow)(pBlock->data);
|
pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue