Merge pull request #1405 from taosdata/liaohj_2
[td-32] fix bugs in insertion and retrieve data
This commit is contained in:
commit
88f44b1adb
|
@ -18,9 +18,10 @@
|
|||
|
||||
#define _XOPEN_SOURCE
|
||||
|
||||
#include "hash.h"
|
||||
#include "os.h"
|
||||
#include "tscSecondaryMerge.h"
|
||||
|
||||
#include "hash.h"
|
||||
//#include "tscSecondaryMerge.h"
|
||||
#include "tscUtil.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tsclient.h"
|
||||
|
@ -32,6 +33,8 @@
|
|||
#include "tstoken.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#include "dataformat.h"
|
||||
|
||||
enum {
|
||||
TSDB_USE_SERVER_TS = 0,
|
||||
TSDB_USE_CLI_TS = 1,
|
||||
|
@ -393,7 +396,6 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
|
|||
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
|
||||
int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
|
||||
int32_t index = 0;
|
||||
// bool isPrevOptr; //fang, never used
|
||||
SSQLToken sToken = {0};
|
||||
char * payload = pDataBlocks->pData + pDataBlocks->size;
|
||||
|
||||
|
@ -604,8 +606,8 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
|
||||
pBlocks->sid = pTableMeta->sid;
|
||||
static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
|
||||
pBlocks->tid = pTableMeta->sid;
|
||||
pBlocks->uid = pTableMeta->uid;
|
||||
pBlocks->sversion = pTableMeta->sversion;
|
||||
pBlocks->numOfRows += numOfRows;
|
||||
|
@ -613,10 +615,10 @@ static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableM
|
|||
|
||||
// data block is disordered, sort it in ascending order
|
||||
void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
|
||||
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)dataBuf->pData;
|
||||
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
|
||||
|
||||
// size is less than the total size, since duplicated rows may be removed yet.
|
||||
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size);
|
||||
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
|
||||
|
||||
// if use server time, this block must be ordered
|
||||
if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
|
||||
|
@ -624,7 +626,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
|
|||
}
|
||||
|
||||
if (!dataBuf->ordered) {
|
||||
char *pBlockData = pBlocks->payLoad;
|
||||
char *pBlockData = pBlocks->data;
|
||||
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
|
||||
|
||||
int32_t i = 0;
|
||||
|
@ -650,7 +652,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
|
|||
dataBuf->ordered = true;
|
||||
|
||||
pBlocks->numOfRows = i + 1;
|
||||
dataBuf->size = sizeof(SShellSubmitBlock) + dataBuf->rowSize * pBlocks->numOfRows;
|
||||
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -663,7 +665,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
|
|||
|
||||
STableDataBlocks *dataBuf = NULL;
|
||||
int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SShellSubmitBlock), tinfo.rowSize, pTableMetaInfo->name,
|
||||
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
|
||||
pTableMeta, &dataBuf);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
|
@ -691,11 +693,11 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
|
|||
SParamInfo *param = dataBuf->params + i;
|
||||
if (param->idx == -1) {
|
||||
param->idx = pCmd->numOfParams++;
|
||||
param->offset -= sizeof(SShellSubmitBlock);
|
||||
param->offset -= sizeof(SSubmitBlk);
|
||||
}
|
||||
}
|
||||
|
||||
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData);
|
||||
SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
|
||||
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
|
||||
|
||||
dataBuf->vgId = pTableMeta->vgId;
|
||||
|
@ -1141,7 +1143,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
|||
STableDataBlocks *pDataBlock = NULL;
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SShellSubmitBlock), pTableMetaInfo->name,
|
||||
int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name,
|
||||
pTableMeta, &pDataBlock);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
goto _error_clean;
|
||||
|
@ -1353,7 +1355,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
|
|||
assert(pCmd->numOfClause == 1);
|
||||
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
|
||||
|
||||
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
|
||||
SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
|
||||
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
|
||||
|
||||
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1394,7 +1396,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
|
|||
|
||||
pCmd->pDataBlocks = tscCreateBlockArrayList();
|
||||
STableDataBlocks *pTableDataBlock = NULL;
|
||||
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SShellSubmitBlock),
|
||||
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk),
|
||||
pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
|
@ -1435,7 +1437,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
|
|||
}
|
||||
|
||||
pTableDataBlock = pCmd->pDataBlocks->pData[0];
|
||||
pTableDataBlock->size = sizeof(SShellSubmitBlock);
|
||||
pTableDataBlock->size = sizeof(SSubmitBlk);
|
||||
pTableDataBlock->rowSize = tinfo.rowSize;
|
||||
|
||||
numOfRows += pSql->res.numOfRows;
|
||||
|
|
|
@ -325,12 +325,12 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
|
|||
|
||||
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
|
||||
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
|
||||
uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock);
|
||||
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
|
||||
uint32_t dataSize = totalDataSize / alloced;
|
||||
assert(dataSize * alloced == totalDataSize);
|
||||
|
||||
if (alloced == binded) {
|
||||
totalDataSize += dataSize + sizeof(SShellSubmitBlock);
|
||||
totalDataSize += dataSize + sizeof(SSubmitBlk);
|
||||
if (totalDataSize > pBlock->nAllocSize) {
|
||||
const double factor = 1.5;
|
||||
void* tmp = realloc(pBlock->pData, (uint32_t)(totalDataSize * factor));
|
||||
|
@ -342,7 +342,7 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
|
|||
}
|
||||
}
|
||||
|
||||
char* data = pBlock->pData + sizeof(SShellSubmitBlock) + dataSize * binded;
|
||||
char* data = pBlock->pData + sizeof(SSubmitBlk) + dataSize * binded;
|
||||
for (uint32_t j = 0; j < pBlock->numOfParams; ++j) {
|
||||
SParamInfo* param = pBlock->params + j;
|
||||
int code = doBindParam(data, param, bind + param->idx);
|
||||
|
@ -365,10 +365,10 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
|
|||
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
|
||||
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
|
||||
|
||||
uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock);
|
||||
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
|
||||
pBlock->size += totalDataSize / alloced;
|
||||
|
||||
SShellSubmitBlock* pSubmit = (SShellSubmitBlock*)pBlock->pData;
|
||||
SSubmitBlk* pSubmit = (SSubmitBlk*)pBlock->pData;
|
||||
pSubmit->numOfRows += pSubmit->numOfRows / alloced;
|
||||
}
|
||||
|
||||
|
@ -398,10 +398,10 @@ static int insertStmtReset(STscStmt* pStmt) {
|
|||
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
|
||||
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
|
||||
|
||||
uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock);
|
||||
pBlock->size = sizeof(SShellSubmitBlock) + totalDataSize / alloced;
|
||||
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
|
||||
pBlock->size = sizeof(SSubmitBlk) + totalDataSize / alloced;
|
||||
|
||||
SShellSubmitBlock* pSubmit = (SShellSubmitBlock*)pBlock->pData;
|
||||
SSubmitBlk* pSubmit = (SSubmitBlk*)pBlock->pData;
|
||||
pSubmit->numOfRows = pSubmit->numOfRows / alloced;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ SSchema *tscGetTableSchema(const STableMeta *pTableMeta) {
|
|||
return pSTableMeta->schema;
|
||||
}
|
||||
|
||||
return pTableMeta->schema;
|
||||
return (SSchema*) pTableMeta->schema;
|
||||
}
|
||||
|
||||
SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) {
|
||||
|
|
|
@ -508,14 +508,17 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pRetrieveMsg->free = htons(pQueryInfo->type);
|
||||
pMsg += sizeof(pQueryInfo->type);
|
||||
|
||||
pSql->cmd.payloadLen = pMsg - pStart;
|
||||
pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
|
||||
pRetrieveMsg->header.vgId = htonl(1);
|
||||
pMsg += sizeof(SRetrieveTableMsg);
|
||||
|
||||
pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
|
||||
|
||||
pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
|
||||
//SShellSubmitMsg *pShellMsg;
|
||||
//SSubmitMsg *pShellMsg;
|
||||
//char * pMsg;
|
||||
//STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
|
||||
|
||||
|
@ -524,14 +527,14 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
|
|||
//pMsg = buf + tsRpcHeadSize;
|
||||
|
||||
//TODO set iplist
|
||||
//pShellMsg = (SShellSubmitMsg *)pMsg;
|
||||
//pShellMsg = (SSubmitMsg *)pMsg;
|
||||
//pShellMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode);
|
||||
//tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pSql->index].ip),
|
||||
// htons(pShellMsg->vnode));
|
||||
}
|
||||
|
||||
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||
SShellSubmitMsg *pShellMsg;
|
||||
SSubmitMsg *pShellMsg;
|
||||
char * pMsg, *pStart;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
@ -540,23 +543,22 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pStart = pSql->cmd.payload + tsRpcHeadSize;
|
||||
pMsg = pStart;
|
||||
|
||||
pShellMsg = (SShellSubmitMsg *)pMsg;
|
||||
SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
|
||||
pMsgDesc->numOfVnodes = htonl(1); //set the number of vnodes
|
||||
pMsg += sizeof(SMsgDesc);
|
||||
|
||||
pShellMsg->desc.numOfVnodes = htonl(1);
|
||||
|
||||
pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
|
||||
pShellMsg = (SSubmitMsg *)pMsg;
|
||||
pShellMsg->header.vgId = htonl(pTableMeta->vgId);
|
||||
pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen);
|
||||
pShellMsg->length = pShellMsg->header.contLen;
|
||||
|
||||
pShellMsg->numOfTables = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
|
||||
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
|
||||
|
||||
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
|
||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||
|
||||
// tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip),
|
||||
// htons(pShellMsg->vnode));
|
||||
|
||||
// pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -147,7 +147,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
|||
}
|
||||
|
||||
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
|
||||
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
|
||||
tsInsertHeadSize = tsRpcHeadSize + sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
||||
return pObj;
|
||||
}
|
||||
|
||||
|
|
|
@ -695,6 +695,49 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
|
||||
int32_t firstPartLen = 0;
|
||||
|
||||
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
||||
SSchema* pSchema = tscGetTableSchema(pTableMeta);
|
||||
|
||||
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
|
||||
pDataBlock += sizeof(SSubmitBlk);
|
||||
|
||||
int32_t total = sizeof(int32_t)*2;
|
||||
for(int32_t i = 0; i < tinfo.numOfColumns; ++i) {
|
||||
switch (pSchema[i].type) {
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_BINARY: {
|
||||
assert(0); // not support binary yet
|
||||
firstPartLen += sizeof(int32_t);break;
|
||||
}
|
||||
default:
|
||||
firstPartLen += tDataTypeDesc[pSchema[i].type].nSize;
|
||||
total += tDataTypeDesc[pSchema[i].type].nSize;
|
||||
}
|
||||
}
|
||||
|
||||
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
|
||||
|
||||
SSubmitBlk* pBlock = pTableDataBlock->pData;
|
||||
int32_t rows = htons(pBlock->numOfRows);
|
||||
|
||||
for(int32_t i = 0; i < rows; ++i) {
|
||||
*(int32_t*) pDataBlock = total;
|
||||
pDataBlock += sizeof(int32_t);
|
||||
|
||||
*(int32_t*) pDataBlock = firstPartLen;
|
||||
pDataBlock += sizeof(int32_t);
|
||||
|
||||
memcpy(pDataBlock, p, pTableDataBlock->rowSize);
|
||||
|
||||
p += pTableDataBlock->rowSize;
|
||||
pDataBlock += pTableDataBlock->rowSize;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
|
@ -716,7 +759,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
|
|||
return ret;
|
||||
}
|
||||
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size;
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pOneTableBlock->size*sizeof(int32_t)*2;
|
||||
if (dataBuf->nAllocSize < destSize) {
|
||||
while (dataBuf->nAllocSize < destSize) {
|
||||
dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
|
||||
|
@ -730,29 +773,33 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
|
|||
tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize);
|
||||
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
tfree(dataBuf->pData);
|
||||
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
||||
tfree(dataBuf->pData);
|
||||
|
||||
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)pOneTableBlock->pData;
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||
sortRemoveDuplicates(pOneTableBlock);
|
||||
|
||||
char* e = (char*)pBlocks->payLoad + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
|
||||
char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
|
||||
|
||||
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, pBlocks->sid,
|
||||
pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->payLoad), GET_INT64_VAL(e));
|
||||
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
|
||||
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(e));
|
||||
|
||||
pBlocks->sid = htonl(pBlocks->sid);
|
||||
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2);
|
||||
|
||||
pBlocks->tid = htonl(pBlocks->tid);
|
||||
pBlocks->uid = htobe64(pBlocks->uid);
|
||||
pBlocks->sversion = htonl(pBlocks->sversion);
|
||||
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
||||
|
||||
memcpy(dataBuf->pData + dataBuf->size, pOneTableBlock->pData, pOneTableBlock->size);
|
||||
pBlocks->len = htonl(len);
|
||||
|
||||
dataBuf->size += pOneTableBlock->size;
|
||||
// erase the empty space reserved for binary data
|
||||
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
|
||||
dataBuf->size += (len + sizeof(SSubmitBlk));
|
||||
dataBuf->numOfTables += 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -78,6 +78,8 @@ void dnodeRead(SRpcMsg *pMsg) {
|
|||
char *pCont = (char *) pMsg->pCont;
|
||||
SRpcContext *pRpcContext = NULL;
|
||||
|
||||
dTrace("dnode read msg disposal");
|
||||
|
||||
// SMsgDesc *pDesc = pCont;
|
||||
// pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
|
||||
// pCont += sizeof(SMsgDesc);
|
||||
|
@ -229,7 +231,8 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
|
|||
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
|
||||
|
||||
SQInfo* pQInfo = NULL;
|
||||
int32_t code = qCreateQueryInfo(pQueryTableMsg, &pQInfo);
|
||||
void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
|
||||
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
pRsp->code = code;
|
||||
|
@ -243,17 +246,17 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
|
|||
.msgType = 0
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
// do execute query
|
||||
qTableQuery(pQInfo);
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
|
||||
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
|
||||
void *pQInfo = htobe64(pRetrieve->qhandle);
|
||||
|
||||
dTrace("retrieve msg is disposed, qInfo:%p", pQInfo);
|
||||
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
|
||||
|
||||
int32_t rowSize = 0;
|
||||
int32_t numOfRows = 0;
|
||||
|
@ -284,11 +287,12 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
|
|||
contLen = 100;
|
||||
|
||||
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
|
||||
pRsp->numOfRows = 0;
|
||||
pRsp->precision = 0;
|
||||
pRsp->offset = 0;
|
||||
pRsp->useconds = 0;
|
||||
pRsp->numOfRows = htonl(1);
|
||||
pRsp->precision = htons(0);
|
||||
pRsp->offset = htobe64(0);
|
||||
pRsp->useconds = htobe64(0);
|
||||
|
||||
// todo set the data
|
||||
*(int64_t*) pRsp->data = 1000;
|
||||
|
||||
rpcRsp = (SRpcMsg) {
|
||||
|
|
|
@ -276,7 +276,10 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
|
|||
pRsp->affectedRows = htonl(1);
|
||||
pRsp->numOfFailedBlocks = 0;
|
||||
|
||||
// todo write to tsdb
|
||||
void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||
assert(tsdb != NULL);
|
||||
|
||||
tsdbInsertData(tsdb, pMsg->pCont);
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->rpcMsg.handle,
|
||||
|
@ -285,6 +288,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
|
|||
.code = 0,
|
||||
.msgType = 0
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
|
|
|
@ -188,14 +188,14 @@ extern char *taosMsg[];
|
|||
|
||||
#pragma pack(push, 1)
|
||||
|
||||
typedef struct {
|
||||
int32_t vnode;
|
||||
int32_t sid;
|
||||
int32_t sversion;
|
||||
uint64_t uid;
|
||||
int16_t numOfRows;
|
||||
char payLoad[];
|
||||
} SShellSubmitBlock;
|
||||
//typedef struct {
|
||||
// int32_t vnode;
|
||||
// int32_t sid;
|
||||
// int32_t sversion;
|
||||
// uint64_t uid;
|
||||
// int16_t numOfRows;
|
||||
// char payLoad[];
|
||||
//} SShellSubmitBlock;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfVnodes;
|
||||
|
@ -206,13 +206,33 @@ typedef struct SMsgHead {
|
|||
int32_t vgId;
|
||||
} SMsgHead;
|
||||
|
||||
typedef struct {
|
||||
SMsgDesc desc;
|
||||
//typedef struct {
|
||||
// SMsgDesc desc;
|
||||
// SMsgHead header;
|
||||
// int16_t import;
|
||||
// int32_t numOfTables; // total number of sid
|
||||
// char blks[]; // number of data blocks, each table has at least one data block
|
||||
//} SShellSubmitMsg;
|
||||
|
||||
// Submit message for one table
|
||||
typedef struct SSubmitBlk {
|
||||
int64_t uid; // table unique id
|
||||
int32_t tid; // table id
|
||||
int32_t padding; // TODO just for padding here
|
||||
int32_t sversion; // data schema version
|
||||
int32_t len; // data part length, not including the SSubmitBlk head
|
||||
int16_t numOfRows; // total number of rows in current submit block
|
||||
char data[];
|
||||
} SSubmitBlk;
|
||||
|
||||
// Submit message for this TSDB
|
||||
typedef struct SSubmitMsg {
|
||||
SMsgHead header;
|
||||
int16_t import;
|
||||
int32_t numOfTables; // total number of sid
|
||||
char blks[]; // number of data blocks, each table has at least one data block
|
||||
} SShellSubmitMsg;
|
||||
int32_t length;
|
||||
int32_t compressed:2;
|
||||
int32_t numOfBlocks:30;
|
||||
SSubmitBlk blocks[];
|
||||
} SSubmitMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t index; // index of failed block in submit blocks
|
||||
|
@ -506,12 +526,14 @@ typedef struct {
|
|||
} SQueryTableRsp;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
uint64_t qhandle;
|
||||
uint16_t free;
|
||||
} SRetrieveTableMsg;
|
||||
|
||||
typedef struct {
|
||||
typedef struct SRetrieveTableRsp {
|
||||
int32_t numOfRows;
|
||||
int8_t completed; // all results are returned to client
|
||||
int16_t precision;
|
||||
int64_t offset; // updated offset value for multi-vnode projection query
|
||||
int64_t useconds;
|
||||
|
|
|
@ -171,6 +171,7 @@ typedef struct SQueryRuntimeEnv {
|
|||
|
||||
typedef struct SQInfo {
|
||||
uint64_t signature;
|
||||
void* pVnode;
|
||||
TSKEY startTime;
|
||||
int64_t elapsedTime;
|
||||
SResultRec rec;
|
||||
|
@ -205,7 +206,7 @@ typedef struct SQInfo {
|
|||
* @param pQInfo
|
||||
* @return
|
||||
*/
|
||||
int32_t qCreateQueryInfo(SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
|
||||
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
|
||||
|
||||
/**
|
||||
* query on single table
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
*/
|
||||
#include "os.h"
|
||||
|
||||
#include "taosmsg.h"
|
||||
#include "hash.h"
|
||||
#include "hashfunc.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tlog.h"
|
||||
#include "tlosertree.h"
|
||||
#include "tscompression.h"
|
||||
|
@ -1482,7 +1482,8 @@ static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool i
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order, bool isSTableQuery) {
|
||||
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order,
|
||||
bool isSTableQuery) {
|
||||
dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
@ -2388,7 +2389,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
|
|||
taosArrayPush(cols, &pQuery->colList[i]);
|
||||
}
|
||||
|
||||
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(&cond, sa, cols);
|
||||
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(NULL, &cond, sa, cols);
|
||||
|
||||
// metric query do not invoke interpolation, it will be done at the second-stage merge
|
||||
if (!isPointInterpoQuery(pQuery)) {
|
||||
|
@ -2667,13 +2668,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||
|
||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, &ekey1, &w);
|
||||
doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey,
|
||||
&skey1, &ekey1, &w);
|
||||
pWindowResInfo->startTime = w.skey;
|
||||
pWindowResInfo->prevSKey = w.skey;
|
||||
} else {
|
||||
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
|
||||
TSKEY winStart = blockInfo.window.ekey - pQuery->intervalTime;
|
||||
doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w);
|
||||
doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1,
|
||||
&w);
|
||||
|
||||
pWindowResInfo->startTime = pQuery->window.skey;
|
||||
pWindowResInfo->prevSKey = w.skey;
|
||||
|
@ -2687,9 +2690,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes,
|
||||
&pRuntimeEnv->windowResInfo, pDataBlock);
|
||||
|
||||
// dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, checked:%d",
|
||||
// GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId, pQueryHandle->cur.slot,
|
||||
// pQuery->pos, blockInfo.size, forwardStep);
|
||||
// dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d,
|
||||
// checked:%d",
|
||||
// GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId,
|
||||
// pQueryHandle->cur.slot, pQuery->pos, blockInfo.size, forwardStep);
|
||||
|
||||
// save last access position
|
||||
cnt += forwardStep;
|
||||
|
@ -2997,7 +3001,8 @@ int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) {
|
|||
// dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
|
||||
// }
|
||||
//
|
||||
// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery),
|
||||
// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms",
|
||||
// GET_QINFO_ADDR(pQuery),
|
||||
// pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -4303,7 +4308,7 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) {
|
||||
int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param, void* tsdb) {
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -4337,7 +4342,8 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) {
|
|||
taosArrayPush(cols, &pQuery->colList[i]);
|
||||
}
|
||||
|
||||
pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, pQInfo->pTableIdList, cols);
|
||||
|
||||
pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
|
||||
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
pRuntimeEnv->pQuery = pQuery;
|
||||
|
@ -5278,7 +5284,8 @@ void qTableQuery(SQInfo* pQInfo) {
|
|||
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
|
||||
|
||||
// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
|
||||
// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
||||
// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo,
|
||||
// pQInfo->pointsReturned);
|
||||
sem_post(&pQInfo->dataReady);
|
||||
return;
|
||||
}
|
||||
|
@ -5299,9 +5306,10 @@ void qTableQuery(SQInfo* pQInfo) {
|
|||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
|
||||
|
||||
if (pQuery->rec.pointsRead > 0) {
|
||||
// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d totalReturn:%d",
|
||||
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead,
|
||||
// pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
||||
// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d
|
||||
// totalReturn:%d",
|
||||
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead,
|
||||
// pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
||||
|
||||
sem_post(&pQInfo->dataReady);
|
||||
return;
|
||||
|
@ -5309,7 +5317,8 @@ void qTableQuery(SQInfo* pQInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid,
|
||||
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode,
|
||||
// pMeterObj->sid,
|
||||
// pMeterObj->meterId, pQInfo->pointsRead);
|
||||
|
||||
// vnodePrintQueryStatistics(pSupporter);
|
||||
|
@ -5909,7 +5918,8 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray* pTableIdList) {
|
||||
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs,
|
||||
SArray *pTableIdList) {
|
||||
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
|
||||
if (pQInfo == NULL) {
|
||||
goto _clean_memory;
|
||||
|
@ -6005,7 +6015,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
pQInfo->pTableIdList = pTableIdList;
|
||||
|
||||
pQuery->pos = -1;
|
||||
// dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo);
|
||||
// dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
|
||||
// pQInfo);
|
||||
|
||||
return pQInfo;
|
||||
|
||||
|
@ -6112,7 +6123,7 @@ void vnodeFreeQInfo(SQInfo* pQInfo, bool decQueryRef) {
|
|||
}
|
||||
|
||||
static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
|
||||
SArray* pTableIdList, SQInfo **pQInfo) {
|
||||
SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
|
||||
|
@ -6124,10 +6135,6 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
|
|||
SQuery *pQuery = (*pQInfo)->runtimeEnv.pQuery;
|
||||
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
|
||||
|
||||
// STableIdInfo **pTableIdList = (STableIdInfo **)pQueryMsg->pSidExtInfo;
|
||||
// if (pTableIdList != NULL && pTableIdList[0]->key > 0) {
|
||||
// pQuery->window.skey = pTableIdList[0]->key;
|
||||
// } else {
|
||||
pQuery->window.skey = pQueryMsg->window.skey;
|
||||
pQuery->window.ekey = pQueryMsg->window.ekey;
|
||||
|
||||
|
@ -6151,7 +6158,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
|
|||
tsBufNextPos(pTSBuf);
|
||||
}
|
||||
|
||||
if ((code = vnodeQueryTablePrepare(*pQInfo, pTSBuf)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = vnodeQueryTablePrepare(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -6170,7 +6177,7 @@ _error:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
|
||||
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
|
||||
assert(pQueryTableMsg != NULL);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -6205,7 +6212,7 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
|
|||
if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) {
|
||||
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
|
||||
} else {
|
||||
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, pQInfo);
|
||||
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
|
||||
}
|
||||
|
||||
_query_over:
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
#include "os.h"
|
||||
|
||||
#include "tlog.h"
|
||||
// #include "tsdb.h"
|
||||
#include "tskiplist.h"
|
||||
#include "tutil.h"
|
||||
|
||||
|
@ -395,6 +394,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
|
|||
SSkipListNode *px = pSkipList->pHead;
|
||||
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
|
||||
bool identical = false;
|
||||
for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
|
||||
SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i);
|
||||
while (p != NULL) {
|
||||
|
@ -402,11 +402,16 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
|
|||
char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode);
|
||||
|
||||
// if the forward element is less than the specified key, forward one step
|
||||
if (pSkipList->comparFn(key, newDatakey) < 0) {
|
||||
int32_t ret = pSkipList->comparFn(key, newDatakey);
|
||||
if (ret < 0) {
|
||||
px = p;
|
||||
|
||||
p = SL_GET_FORWARD_POINTER(px, i);
|
||||
} else {
|
||||
if (identical == false) {
|
||||
identical = (ret == 0);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -418,18 +423,13 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
|
|||
}
|
||||
|
||||
// if the skip list does not allowed identical key inserted, the new data will be discarded.
|
||||
if (pSkipList->keyInfo.dupKey == 0 && forward[0] != pSkipList->pHead) {
|
||||
char *key = SL_GET_NODE_KEY(pSkipList, forward[0]);
|
||||
char *pNewDataKey = SL_GET_NODE_KEY(pSkipList, pNode);
|
||||
|
||||
if (pSkipList->comparFn(key, pNewDataKey) == 0) {
|
||||
if (pSkipList->keyInfo.dupKey == 0 && identical) {
|
||||
if (pSkipList->lock) {
|
||||
pthread_rwlock_unlock(pSkipList->lock);
|
||||
}
|
||||
|
||||
return forward[0];
|
||||
}
|
||||
}
|
||||
|
||||
#if SKIP_LIST_RECORD_PERFORMANCE
|
||||
recordNodeEachLevel(pSkipList, level);
|
||||
|
|
|
@ -90,15 +90,6 @@ int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg);
|
|||
int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
|
||||
int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
|
||||
|
||||
// Submit message for one table
|
||||
typedef struct {
|
||||
STableId tableId;
|
||||
int32_t padding; // TODO just for padding here
|
||||
int32_t sversion; // data schema version
|
||||
int32_t len; // data part length, not including the SSubmitBlk head
|
||||
char data[];
|
||||
} SSubmitBlk;
|
||||
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
|
@ -108,15 +99,10 @@ typedef struct {
|
|||
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
||||
SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
|
||||
|
||||
// Submit message for this TSDB
|
||||
typedef struct {
|
||||
int32_t length;
|
||||
int32_t compressed;
|
||||
SSubmitBlk blocks[];
|
||||
} SSubmitMsg;
|
||||
|
||||
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
|
||||
|
||||
struct STsdbRepo;
|
||||
|
||||
// SSubmitMsg Iterator
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
|
@ -245,7 +231,7 @@ typedef void *tsdbpos_t;
|
|||
* @param pTableList table sid list
|
||||
* @return
|
||||
*/
|
||||
tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo);
|
||||
tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo);
|
||||
|
||||
/**
|
||||
* move to next block
|
||||
|
|
|
@ -63,6 +63,26 @@ typedef struct {
|
|||
SFileGroup fGroup[];
|
||||
} STsdbFileH;
|
||||
|
||||
/**
|
||||
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
|
||||
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
|
||||
* the data block offset and length
|
||||
* if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
|
||||
* binary
|
||||
*/
|
||||
typedef struct {
|
||||
int64_t last : 1; // If the block in data file or last file
|
||||
int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks
|
||||
int32_t algorithm : 8; // Compression algorithm
|
||||
int32_t numOfPoints : 24; // Number of total points
|
||||
int32_t sversion; // Schema version
|
||||
int32_t len; // Data block length or nothing
|
||||
int16_t numOfSubBlocks; // Number of sub-blocks;
|
||||
int16_t numOfCols;
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
} SCompBlock;
|
||||
|
||||
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX)
|
||||
|
||||
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
|
||||
|
|
|
@ -44,7 +44,7 @@ typedef struct {
|
|||
typedef struct STable {
|
||||
int8_t type;
|
||||
STableId tableId;
|
||||
int32_t superUid; // Super table UID
|
||||
int64_t superUid; // Super table UID
|
||||
int32_t sversion;
|
||||
STSchema * schema;
|
||||
STSchema * tagSchema;
|
||||
|
@ -98,6 +98,8 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta);
|
|||
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
|
||||
#define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */
|
||||
|
||||
STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo);
|
||||
|
||||
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
|
||||
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
|
||||
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
|
||||
|
|
|
@ -33,26 +33,6 @@ typedef struct {
|
|||
int64_t offset;
|
||||
} SCompIdx;
|
||||
|
||||
/**
|
||||
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
|
||||
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
|
||||
* the data block offset and length
|
||||
* if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
|
||||
* binary
|
||||
*/
|
||||
typedef struct {
|
||||
int64_t last : 1; // If the block in data file or last file
|
||||
int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks
|
||||
int32_t algorithm : 8; // Compression algorithm
|
||||
int32_t numOfPoints : 24; // Number of total points
|
||||
int32_t sversion; // Schema version
|
||||
int32_t len; // Data block length or nothing
|
||||
int16_t numOfSubBlocks; // Number of sub-blocks;
|
||||
int16_t numOfCols;
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
} SCompBlock;
|
||||
|
||||
typedef struct {
|
||||
int32_t delimiter; // For recovery usage
|
||||
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
|
||||
|
|
|
@ -499,6 +499,10 @@ SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
|||
int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
|
||||
if (pMsg == NULL || pIter == NULL) return -1;
|
||||
|
||||
pMsg->length = htonl(pMsg->length);
|
||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||
pMsg->compressed = htonl(pMsg->compressed);
|
||||
|
||||
pIter->totalLen = pMsg->length;
|
||||
pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE;
|
||||
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
|
||||
|
@ -514,6 +518,14 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
|
|||
SSubmitBlk *pBlock = pIter->pBlock;
|
||||
if (pBlock == NULL) return NULL;
|
||||
|
||||
pBlock->len = htonl(pBlock->len);
|
||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
||||
pBlock->uid = htobe64(pBlock->uid);
|
||||
pBlock->tid = htonl(pBlock->tid);
|
||||
|
||||
pBlock->sversion = htonl(pBlock->sversion);
|
||||
pBlock->padding = htonl(pBlock->padding);
|
||||
|
||||
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len;
|
||||
if (pIter->len >= pIter->totalLen) {
|
||||
pIter->pBlock = NULL;
|
||||
|
@ -524,6 +536,11 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
|
|||
return pBlock;
|
||||
}
|
||||
|
||||
STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo) {
|
||||
STsdbRepo *tsdb = (STsdbRepo *)pRepo;
|
||||
return tsdb->tsdbMeta;
|
||||
}
|
||||
|
||||
// Check the configuration and set default options
|
||||
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
||||
// Check precision
|
||||
|
@ -695,6 +712,8 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
|||
tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize);
|
||||
|
||||
TSKEY key = dataRowKey(row);
|
||||
printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints);
|
||||
|
||||
// Copy row into the memory
|
||||
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key);
|
||||
if (pNode == NULL) {
|
||||
|
@ -715,7 +734,9 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
|||
tSkipListPut(pTable->mem->pData, pNode);
|
||||
if (key > pTable->mem->keyLast) pTable->mem->keyLast = key;
|
||||
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
|
||||
pTable->mem->numOfPoints++;
|
||||
|
||||
pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData);
|
||||
// pTable->mem->numOfPoints++;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -723,7 +744,8 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
|||
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
|
||||
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId);
|
||||
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
|
||||
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
|
||||
if (pTable == NULL) return -1;
|
||||
|
||||
SSubmitBlkIter blkIter;
|
||||
|
|
|
@ -14,53 +14,378 @@
|
|||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "tutil.h"
|
||||
|
||||
#include "tsdb.h"
|
||||
#include "tsdbFile.h"
|
||||
#include "tsdbMeta.h"
|
||||
|
||||
tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
|
||||
#define EXTRA_BYTES 2
|
||||
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoEx *)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
|
||||
#define QUERY_IS_ASC_QUERY(o) (o == TSQL_SO_ASC)
|
||||
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
|
||||
|
||||
typedef struct SField {
|
||||
// todo need the definition
|
||||
} SField;
|
||||
|
||||
typedef struct SHeaderFileInfo {
|
||||
int32_t fileId;
|
||||
} SHeaderFileInfo;
|
||||
|
||||
typedef struct SQueryHandlePos {
|
||||
int32_t fileId;
|
||||
int32_t slot;
|
||||
int32_t pos;
|
||||
int32_t fileIndex;
|
||||
} SQueryHandlePos;
|
||||
|
||||
typedef struct SDataBlockLoadInfo {
|
||||
int32_t fileListIndex;
|
||||
int32_t fileId;
|
||||
int32_t slotIdx;
|
||||
int32_t sid;
|
||||
SArray *pLoadedCols;
|
||||
} SDataBlockLoadInfo;
|
||||
|
||||
typedef struct SLoadCompBlockInfo {
|
||||
int32_t sid; /* meter sid */
|
||||
int32_t fileId;
|
||||
int32_t fileListIndex;
|
||||
} SLoadCompBlockInfo;
|
||||
|
||||
typedef struct SQueryFilesInfo {
|
||||
SArray *pFileInfo;
|
||||
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
|
||||
int32_t vnodeId;
|
||||
|
||||
int32_t headerFd; // header file fd
|
||||
int64_t headerFileSize;
|
||||
int32_t dataFd;
|
||||
int32_t lastFd;
|
||||
|
||||
char headerFilePath[PATH_MAX]; // current opened header file name
|
||||
char dataFilePath[PATH_MAX]; // current opened data file name
|
||||
char lastFilePath[PATH_MAX]; // current opened last file path
|
||||
char dbFilePathPrefix[PATH_MAX];
|
||||
} SQueryFilesInfo;
|
||||
|
||||
typedef struct STableQueryRec {
|
||||
TSKEY lastKey;
|
||||
STable * pTableObj;
|
||||
int64_t offsetInHeaderFile;
|
||||
int32_t numOfBlocks;
|
||||
int32_t start;
|
||||
SCompBlock *pBlock;
|
||||
} STableQueryRec;
|
||||
|
||||
typedef struct {
|
||||
SCompBlock *compBlock;
|
||||
SField * fields;
|
||||
} SCompBlockFields;
|
||||
|
||||
typedef struct STableDataBlockInfoEx {
|
||||
SCompBlockFields pBlock;
|
||||
STableQueryRec * pMeterDataInfo;
|
||||
int32_t blockIndex;
|
||||
int32_t groupIdx; /* number of group is less than the total number of meters */
|
||||
} STableDataBlockInfoEx;
|
||||
|
||||
typedef struct STsdbQueryHandle {
|
||||
struct STsdbRepo* pTsdb;
|
||||
|
||||
SQueryHandlePos cur; // current position
|
||||
SQueryHandlePos start; // the start position, used for secondary/third iteration
|
||||
int32_t unzipBufSize;
|
||||
char *unzipBuffer;
|
||||
char *secondaryUnzipBuffer;
|
||||
|
||||
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
||||
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
|
||||
|
||||
SQueryFilesInfo vnodeFileInfo;
|
||||
|
||||
int16_t numOfRowsPerPage;
|
||||
uint16_t flag; // denotes reversed scan of data or not
|
||||
int16_t order;
|
||||
STimeWindow window; // the primary query time window that applies to all queries
|
||||
TSKEY lastKey;
|
||||
int32_t blockBufferSize;
|
||||
SCompBlock *pBlock;
|
||||
int32_t numOfBlocks;
|
||||
SField ** pFields;
|
||||
SArray * pColumns; // column list, SColumnInfoEx array list
|
||||
SArray * pTableIdList; // table id object list
|
||||
bool locateStart;
|
||||
int32_t realNumOfRows;
|
||||
bool loadDataAfterSeek; // load data after seek.
|
||||
|
||||
STableDataBlockInfoEx *pDataBlockInfoEx;
|
||||
STableQueryRec * pTableQueryInfo;
|
||||
int32_t tableIndex;
|
||||
bool isFirstSlot;
|
||||
void * qinfo; // query info handle, for debug purpose
|
||||
} STsdbQueryHandle;
|
||||
|
||||
int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) {
|
||||
// record the maximum column width among columns of this meter/metric
|
||||
SColumnInfoEx *pColumn = taosArrayGet(pQueryHandle->pColumns, 0);
|
||||
|
||||
int32_t maxColWidth = pColumn->info.bytes;
|
||||
for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) {
|
||||
int32_t bytes = pColumn[i].info.bytes;
|
||||
if (bytes > maxColWidth) {
|
||||
maxColWidth = bytes;
|
||||
}
|
||||
}
|
||||
|
||||
// only one unzip buffer required, since we can unzip each column one by one
|
||||
pQueryHandle->unzipBufSize = (size_t)(maxColWidth * rowsPerFileBlock + EXTRA_BYTES); // plus extra_bytes
|
||||
pQueryHandle->unzipBuffer = (char *)calloc(1, pQueryHandle->unzipBufSize);
|
||||
|
||||
pQueryHandle->secondaryUnzipBuffer = (char *)calloc(1, pQueryHandle->unzipBufSize);
|
||||
|
||||
if (pQueryHandle->unzipBuffer == NULL || pQueryHandle->secondaryUnzipBuffer == NULL) {
|
||||
goto _error_clean;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_error_clean:
|
||||
tfree(pQueryHandle->unzipBuffer);
|
||||
tfree(pQueryHandle->secondaryUnzipBuffer);
|
||||
|
||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
|
||||
pVnodeFilesInfo->current = -1;
|
||||
pVnodeFilesInfo->headerFileSize = -1;
|
||||
|
||||
pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value
|
||||
pVnodeFilesInfo->dataFd = FD_INITIALIZER;
|
||||
pVnodeFilesInfo->lastFd = FD_INITIALIZER;
|
||||
}
|
||||
|
||||
static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) {
|
||||
pBlockLoadInfo->slotIdx = -1;
|
||||
pBlockLoadInfo->fileId = -1;
|
||||
pBlockLoadInfo->sid = -1;
|
||||
pBlockLoadInfo->fileListIndex = -1;
|
||||
}
|
||||
|
||||
static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
|
||||
pCompBlockLoadInfo->sid = -1;
|
||||
pCompBlockLoadInfo->fileId = -1;
|
||||
pCompBlockLoadInfo->fileListIndex = -1;
|
||||
}
|
||||
|
||||
static int fileOrderComparFn(const void *p1, const void *p2) {
|
||||
SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1;
|
||||
SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2;
|
||||
|
||||
if (pInfo1->fileId == pInfo2->fileId) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (pInfo1->fileId > pInfo2->fileId) ? 1 : -1;
|
||||
}
|
||||
|
||||
void vnodeRecordAllFiles(int32_t vnodeId, SQueryFilesInfo *pVnodeFilesInfo) {
|
||||
char suffix[] = ".head";
|
||||
pVnodeFilesInfo->pFileInfo = taosArrayInit(4, sizeof(int32_t));
|
||||
|
||||
struct dirent *pEntry = NULL;
|
||||
pVnodeFilesInfo->vnodeId = vnodeId;
|
||||
char* tsDirectory = "";
|
||||
|
||||
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
|
||||
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
|
||||
if (pDir == NULL) {
|
||||
// dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix,
|
||||
// strerror(errno));
|
||||
return;
|
||||
}
|
||||
|
||||
while ((pEntry = readdir(pDir)) != NULL) {
|
||||
if ((pEntry->d_name[0] == '.' && pEntry->d_name[1] == '\0') || (strcmp(pEntry->d_name, "..") == 0)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->d_type & DT_DIR) {
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t len = strlen(pEntry->d_name);
|
||||
if (strcasecmp(&pEntry->d_name[len - 5], suffix) != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t vid = 0;
|
||||
int32_t fid = 0;
|
||||
sscanf(pEntry->d_name, "v%df%d", &vid, &fid);
|
||||
if (vid != vnodeId) { /* ignore error files */
|
||||
// dError("QInfo:%p error data file:%s in vid:%d, ignore", pQInfo, pEntry->d_name, vnodeId);
|
||||
continue;
|
||||
}
|
||||
|
||||
// int32_t firstFid = pVnode->fileId - pVnode->numOfFiles + 1;
|
||||
// if (fid > pVnode->fileId || fid < firstFid) {
|
||||
// dError("QInfo:%p error data file:%s in vid:%d, fid:%d, fid range:%d-%d", pQInfo, pEntry->d_name, vnodeId,
|
||||
// fid, firstFid, pVnode->fileId);
|
||||
// continue;
|
||||
// }
|
||||
|
||||
assert(fid >= 0 && vid >= 0);
|
||||
taosArrayPush(pVnodeFilesInfo->pFileInfo, &fid);
|
||||
}
|
||||
|
||||
closedir(pDir);
|
||||
|
||||
// dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles,
|
||||
// pVnodeFilesInfo->dbFilePathPrefix);
|
||||
|
||||
// order the files information according their names */
|
||||
size_t numOfFiles = taosArrayGetSize(pVnodeFilesInfo->pFileInfo);
|
||||
qsort(pVnodeFilesInfo->pFileInfo->pData, numOfFiles, sizeof(SHeaderFileInfo), fileOrderComparFn);
|
||||
}
|
||||
|
||||
tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
|
||||
// todo 1. filter not exist table
|
||||
|
||||
// todo 2. add the reference count for each table that is involved in query
|
||||
|
||||
STsdbQueryHandle *pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
|
||||
pQueryHandle->order = pCond->order;
|
||||
pQueryHandle->window = pCond->twindow;
|
||||
pQueryHandle->pTsdb = tsdb;
|
||||
|
||||
pQueryHandle->pTableIdList = idList;
|
||||
pQueryHandle->pColumns = pColumnInfo;
|
||||
pQueryHandle->loadDataAfterSeek = false;
|
||||
pQueryHandle->isFirstSlot = true;
|
||||
|
||||
pQueryHandle->lastKey = pQueryHandle->window.skey; // ascending query
|
||||
|
||||
// malloc buffer in order to load data from file
|
||||
int32_t numOfCols = taosArrayGetSize(pColumnInfo);
|
||||
size_t bufferCapacity = 4096;
|
||||
|
||||
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoEx));
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoEx *pCol = taosArrayGet(pColumnInfo, i);
|
||||
SColumnInfoEx pDest = {{0}, 0};
|
||||
|
||||
pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes);
|
||||
pDest.info = pCol->info;
|
||||
taosArrayPush(pQueryHandle->pColumns, &pDest);
|
||||
}
|
||||
|
||||
if (doAllocateBuf(pQueryHandle, bufferCapacity) != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
initQueryFileInfoFD(&pQueryHandle->vnodeFileInfo);
|
||||
vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
|
||||
vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
|
||||
|
||||
int32_t vnodeId = 1;
|
||||
vnodeRecordAllFiles(vnodeId, &pQueryHandle->vnodeFileInfo);
|
||||
|
||||
return (tsdb_query_handle_t)pQueryHandle;
|
||||
}
|
||||
|
||||
static int32_t next = 1;
|
||||
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
|
||||
if (next == 0) {
|
||||
return false;
|
||||
} else {
|
||||
next = 0;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
|
||||
TSKEY* skey, TSKEY* ekey, STsdbQueryHandle* pHandle) {
|
||||
int numOfRows = 0;
|
||||
int32_t numOfCols = taosArrayGetSize(pHandle->pColumns);
|
||||
*skey = INT64_MIN;
|
||||
|
||||
while(tSkipListIterNext(pIter)) {
|
||||
SSkipListNode *node = tSkipListIterGet(pIter);
|
||||
if (node == NULL) break;
|
||||
|
||||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
if (dataRowKey(row) > maxKey) break;
|
||||
// Convert row data to column data
|
||||
|
||||
if (*skey == INT64_MIN) {
|
||||
*skey = dataRowKey(row);
|
||||
}
|
||||
|
||||
*ekey = dataRowKey(row);
|
||||
|
||||
int32_t offset = 0;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, 0);
|
||||
memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
|
||||
offset += pColInfo->info.bytes;
|
||||
}
|
||||
|
||||
numOfRows++;
|
||||
if (numOfRows > maxRowsToRead) break;
|
||||
};
|
||||
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
// copy data from cache into data block
|
||||
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
|
||||
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
|
||||
STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0);
|
||||
|
||||
STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid};
|
||||
STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pHandle->pTsdb), tableId);
|
||||
assert(pTable != NULL);
|
||||
|
||||
TSKEY skey = 0, ekey = 0;
|
||||
int32_t rows = 0;
|
||||
|
||||
if (pTable->mem != NULL) {
|
||||
SSkipListIterator* iter = tSkipListCreateIter(pTable->mem->pData);
|
||||
rows = tsdbReadRowsFromCache(iter, INT64_MAX, 4000, &skey, &ekey, pHandle);
|
||||
}
|
||||
|
||||
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) {
|
||||
SDataBlockInfo blockInfo = {
|
||||
.uid = tableId.uid,
|
||||
.sid = tableId.tid,
|
||||
.size = rows,
|
||||
.window = {.skey = skey, .ekey = ekey}
|
||||
};
|
||||
|
||||
return blockInfo;
|
||||
}
|
||||
|
||||
// return null for data block in cache
|
||||
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) {
|
||||
*pBlockStatis = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
|
||||
|
||||
}
|
||||
|
||||
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order) {
|
||||
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {}
|
||||
|
||||
}
|
||||
int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) {}
|
||||
|
||||
int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) {
|
||||
tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) { return NULL; }
|
||||
|
||||
}
|
||||
SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) {}
|
||||
|
||||
tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) {
|
||||
return NULL;
|
||||
}
|
||||
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {}
|
||||
|
||||
SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) {
|
||||
|
||||
}
|
||||
|
||||
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {
|
||||
|
||||
}
|
||||
|
||||
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {
|
||||
|
||||
}
|
||||
|
||||
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {
|
||||
|
||||
}
|
||||
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {}
|
||||
|
||||
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {}
|
||||
|
|
Loading…
Reference in New Issue