feature/scheduler
This commit is contained in:
parent
7db189251d
commit
260aad808b
|
@ -1375,7 +1375,9 @@ typedef struct {
|
||||||
SArray* pArray;
|
SArray* pArray;
|
||||||
} SVCreateTbBatchReq;
|
} SVCreateTbBatchReq;
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
SArray* rspList; // SArray<SVCreateTbRsp>
|
||||||
} SVCreateTbBatchRsp;
|
} SVCreateTbBatchRsp;
|
||||||
|
|
||||||
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
|
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
|
||||||
|
|
|
@ -193,7 +193,7 @@ void vnodeOptionsInit(SVnodeCfg *pOptions);
|
||||||
*/
|
*/
|
||||||
void vnodeOptionsClear(SVnodeCfg *pOptions);
|
void vnodeOptionsClear(SVnodeCfg *pOptions);
|
||||||
|
|
||||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableName);
|
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
|
||||||
|
|
||||||
|
|
||||||
/* ------------------------ FOR COMPILE ------------------------ */
|
/* ------------------------ FOR COMPILE ------------------------ */
|
||||||
|
|
|
@ -34,19 +34,22 @@ void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) {
|
||||||
memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeCfg));
|
memcpy((void *)pDest, (void *)pSrc, sizeof(SVnodeCfg));
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableName) {
|
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) {
|
||||||
uint32_t hashValue = 0;
|
uint32_t hashValue = 0;
|
||||||
|
|
||||||
switch (pVnodeOptions->hashMethod) {
|
switch (pVnodeOptions->hashMethod) {
|
||||||
default:
|
default:
|
||||||
hashValue = MurmurHash3_32(tableName, strlen(tableName));
|
hashValue = MurmurHash3_32(tableFName, strlen(tableFName));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO OPEN THIS !!!!!!!
|
||||||
|
#if 1
|
||||||
if (hashValue < pVnodeOptions->hashBegin || hashValue > pVnodeOptions->hashEnd) {
|
if (hashValue < pVnodeOptions->hashBegin || hashValue > pVnodeOptions->hashEnd) {
|
||||||
terrno = TSDB_CODE_VND_HASH_MISMATCH;
|
terrno = TSDB_CODE_VND_HASH_MISMATCH;
|
||||||
return TSDB_CODE_VND_HASH_MISMATCH;
|
return TSDB_CODE_VND_HASH_MISMATCH;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,10 +77,36 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
case TDMT_VND_CREATE_TABLE: {
|
case TDMT_VND_CREATE_TABLE: {
|
||||||
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
||||||
|
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
|
||||||
tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
|
tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
|
||||||
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
|
int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray);
|
||||||
|
for (int i = 0; i < reqNum; i++) {
|
||||||
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
||||||
|
|
||||||
|
// TODO OPEN THIS
|
||||||
|
#if 0
|
||||||
|
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
tNameExtractFullName(&pCreateTbReq->name, tableFName);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
|
||||||
|
if (code) {
|
||||||
|
SVCreateTbRsp rsp;
|
||||||
|
rsp.code = code;
|
||||||
|
memcpy(rsp.tableName, pCreateTbReq->name, sizeof(rsp.tableName));
|
||||||
|
|
||||||
|
if (NULL == vCreateTbBatchRsp.rspList) {
|
||||||
|
vCreateTbBatchRsp.rspList = taosArrayInit(reqNum - i, sizeof(SVCreateTbRsp));
|
||||||
|
if (NULL == vCreateTbBatchRsp.rspList) {
|
||||||
|
vError("vgId:%d, failed to init array: %d", reqNum - i);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(vCreateTbBatchRsp.rspList, &rsp);
|
||||||
|
}
|
||||||
|
|
||||||
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
|
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
|
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
|
||||||
|
@ -98,6 +124,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
||||||
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
|
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
|
||||||
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
||||||
|
if (vCreateTbBatchRsp.rspList) {
|
||||||
|
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_VND_ALTER_STB: {
|
case TDMT_VND_ALTER_STB: {
|
||||||
|
|
Loading…
Reference in New Issue