当前位置:

spring程序使用三个方式在apache ignite存取数据

访客 2024-04-24 604 0


本文基于分布式内存数据库ignite的2.12.0版本,展示使用自动序列化、BinaryObjectBuilder、sql三种方式存取数据的demo

创建数据表(cache)

准备集群,集群server节点需要使用四个端口:
10800(JDBC/ODBC),11211(TCPconnector),47100(listener),47500(discovery)

<?xmlversion="1.0"encoding="UTF-8"?><beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:util="http://www.springframework.org/schema/util"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/utilhttp://www.springframework.org/schema/util/spring-util.xsd"><beanid="ignite.cfg"class="org.apache.ignite.configuration.IgniteConfiguration"><propertyname="igniteInstanceName"value="test-grid-xx1-yy1"/><propertyname="userAttributes"><map><entrykey="rack"value="xx1"/></map></property><!--Enablepeerclassloading.--><propertyname="peerClassLoadingEnabled"value="true"/><!--Setdeploymentmode.--><propertyname="deploymentMode"value="ISOLATED"/><propertyname="authenticationEnabled"value="true"/><!--Enabletaskexecutioneventsforexamples.--><propertyname="includeEventTypes"><list><!--Taskexecutionevents--><util:constantstatic-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/><util:constantstatic-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/><util:constantstatic-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/><util:constantstatic-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/><util:constantstatic-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/><util:constantstatic-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/><!--Cacheevents--><util:constantstatic-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/><util:constantstatic-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/><util:constantstatic-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/></list></property><propertyname="sqlConfiguration"><beanclass="org.apache.ignite.configuration.SqlConfiguration"><propertyname="sqlSchemas"><list><value>DEV</value><value>SIT</value><value>UAT</value><value>VER</value></list></property></bean></property><propertyname="cacheConfiguration"><list><!--PartitionedcacheforPersonsdata.--><beanclass="org.apache.ignite.configuration.CacheConfiguration"><propertyname="name"value="test"/><propertyname="backups"value="1"/><propertyname="cacheMode"value="PARTITIONED"/><propertyname="atomicityMode"value="TRANSACTIONAL"/><!--Groupthecachebelongsto.--><propertyname="groupName"value="DEV"/></bean></list></property><propertyname="dataStorageConfiguration"><beanclass="org.apache.ignite.configuration.DataStorageConfiguration"><!--Defaultmemoryregionthatgrowsendlessly.AcacheisboundtothismemoryregionunlessitsetsanotheroneinitsCacheConfiguration.--><propertyname="defaultDataRegionConfiguration"><beanclass="org.apache.ignite.configuration.DataRegionConfiguration"><propertyname="name"value="Default_Region"/><propertyname="persistenceEnabled"value="true"/><!--100MBmemoryregionwithdisabledeviction--><propertyname="initialSize"value="#{100L*1024*1024}"/><propertyname="maxSize"value="#{100L*1024*1024*1024}"/><!--EnablingSEGMENTED_LRUpagereplacementforthisregion.--><propertyname="pageReplacementMode"value="SEGMENTED_LRU"/><propertyname="metricsEnabled"value="true"/><propertyname="warmUpConfiguration"><beanclass="org.apache.ignite.configuration.LoadAllWarmUpConfiguration"/></property></bean></property></bean></property><!--ExplicitlyconfigureTCPdiscoverySPItoprovidelistofinitialnodesfromthefirstcluster.--><propertyname="discoverySpi"><beanclass="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"><propertyname="localAddress"value="10.0.0.1"/><propertyname="localPort"value="47500"/><propertyname="localPortRange"value="20"/><propertyname="ackTimeout"value="#{3L*1000}"/><propertyname="reconnectDelay"value="2000"/><propertyname="reconnectCount"value="5"/><propertyname="connectionRecoveryTimeout"value="#{60L*1000}"/><!--SettingupIPfinderforthiscluster--><propertyname="ipFinder"><beanclass="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"><propertyname="shared"value="true"/><propertyname="addresses"><list><value>10.0.0.2:47500..47520</value><value>10.0.0.3:47500..47520</value></list></property></bean></property></bean></property><!--ExplicitlyconfigureTCPcommunicationSPIchanginglocalportnumberforthenodesfromthefirstcluster.--><propertyname="communicationSpi"><beanclass="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"><propertyname="localPort"value="47100"/><propertyname="localPortRange"value="20"/><propertyname="connectTimeout"value="#{5L*1000}"/><propertyname="reconnectCount"value="12"/><propertyname="idleConnectionTimeout"value="#{60L*1000}"/><propertyname="usePairedConnections"value="true"/><propertyname="connectionsPerNode"value="3"/></bean></property></bean></beans>

登录到ignite,创建用户

./sqlline.sh--verbose=true-u'jdbc:ignite:thin://10.0.0.1:10800,10.0.0.2:10800/SYS'#默认管理账户是ignite:ignite,一般需要创建自定义用户CREATEUSER"wzp"WITHPASSWORD'wzp';#查看常用的运维信息:https://ignite.apache.org/docs/latest/monitoring-metrics/system-viewsselect*fromSYS.NODES;select*fromSYS.CACHES;select*fromSYS.SCHEMAS;select*fromSYS.TABLES;select*fromSYS.TABLE_COLUMNS;select*fromSYS.BINARY_METADATA;

使用createtable来创建cache,可以避免部署自定义类到server节点并支持使用二级索引;
table对应的cahe的名称为SQL_<SCHEMA_NAME>_<TABLE_NAME>,即:SQL_DEV_PERSON

CREATETABLEIFNOTEXISTSDEV.PERSON(idint,city_idint,namevarchar,ageint,companyvarchar,PRIMARYKEY(id))WITH"TEMPLATE=PARTITIONED,CACHE_GROUP=DEV,BACKUPS=1,ATOMICITY=TRANSACTIONAL,VALUE_TYPE=com.wzp.ignite.bo.Person";

初始化springbean

@BeanpublicIgniteClientigniteClient(){ClientConfigurationcfg=newClientConfiguration();cfg.setAddresses("10.0.0.1:10800","10.0.0.2:10800").setUserName("wzp").setUserPassword("wzp");cfg.setPartitionAwarenessEnabled(false);cfg.setTransactionConfiguration(newClientTransactionConfiguration().setDefaultTxTimeout(10000).setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC).setDefaultTxIsolation(TransactionIsolation.REPEATABLE_READ));returnIgnition.startClient(cfg);}@Bean("igniteTransactionManager")publicIgniteClientSpringTransactionManagerigniteTransactionManager(IgniteClientcli){IgniteClientSpringTransactionManagermgr=newIgniteClientSpringTransactionManager();mgr.setClientInstance(cli);returnmgr;}

使用自动序列化机制(Key-ValueAPI)

ignite实质是key-value数据库,会自动使用BinaryObject格式把value(POJO对象)进行序列化后存储

packagecom.wzp.ignite.bo;@lombok.DatapublicclassPersonimplementsjava.io.Serializable{privatestaticfinallongserialVersionUID=1L;privateintid;privateStringname;privateintage;privateStringcompany;//即使配置@QuerySqlField也无法支持自动序列化时的驼峰转下划线:cityId->city_idprivateintcity_id;}

只要保证table定义里的VALUE_TYPE和POJO对象的类名一致,那么sql也能查询到使用Key-ValueAPI存入的POJO数据。如果使用复合主键,那么需要设置好table定义里的KEY_TYPE。

@AutowiredIgniteClientignite;privatevoidinsertObject(intkey){ClientCache<Integer,Person>cache=ignite.cache("SQL_"schemaName"_""PERSON");Personperson=newPerson();person.setId(key);person.setCity_id(1001);person.setAge(1);person.setName("Serializable");person.setCompany("MIDEA");cache.put(key,person);}privatePersongetObject(intkey){ClientCache<Integer,Person>cache=ignite.cache("SQL_"schemaName"_""PERSON");returncache.get(key);}

如需原子性地修改几行数据,在public方法上加上注解:
@Transactional(transactionManager=“igniteTransactionManager”)

使用BinaryObjectBuilder(Key-ValueAPI)

如果程序里不存在Person类的定义,也可以使用BinaryObjectBuilder操作Table表数据;
builder的名称需要与table定义里的VALUE_TYPE一致:

privatevoidinsertBinaryObject(intkey){ClientCache<Integer,BinaryObject>binaryCache=ignite.cache("SQL_"schemaName"_""PERSON").withKeepBinary();BinaryObjectBuilderbuilder=ignite.binary().builder("com.wzp.ignite.bo.Person");builder.setField("id",key);builder.setField("city_id",1001);builder.setField("name","BinaryObjectBuilder");builder.setField("age",30);builder.setField("company","MIDEA");binaryCache.put(key,builder.build());}privateBinaryObjectgetBinaryObject(intkey){ClientCache<Integer,BinaryObject>binaryCache=ignite.cache(cacheName).withKeepBinary();returnbinaryCache.get(key);}/***用key-valueapi操作使用复合主键的表*/privatevoidinsertBinaryObject(intuserId,Stringmonth){ClientCache<BinaryObject,BinaryObject>binaryCache=ignite.cache("SQL_"schemaName"_""SALLARY").withKeepBinary();//定义表时,指定KEY_TYPEBinaryObjectBuilderkeyBuilder=ignite.binary().builder("com.wzp.ignite.bo.SallaryKey");keyBuilder.setField("user_id",userId);keyBuilder.setField("month",month);BinaryObjectBuildervalueBuilder=ignite.binary().builder("com.wzp.ignite.bo.SallaryValue");valueBuilder.setField("salary_before_tax",100000L);valueBuilder.setField("tax",3L);binaryCache.put(keyBuilder.build(),valueBuilder.build());}

使用SQL(SQLAPI)

瘦客户端模式下必须使用SQLAPI才能在服务端进行数据筛选,并利用到二级索引;
通过setSchema来切换schema

privatevoidinsertBySql(intkey){ClientCache<Integer,Person>cache=ignite.cache(cacheName);cache.query(newSqlFieldsQuery("INSERTINTOperson(id,city_id,name,age,company)VALUES(?,?,?,?,?)").setArgs(key,1001,"SQL",30,"Midea").setSchema(schemaName)).getAll();}privatePersonselectBySql(intkey){ClientCache<Integer,Person>cache=ignite.cache(cacheName);try(FieldsQueryCursor<List<?>>cursor=cache.query(newSqlFieldsQuery("select*frompersonwhereid=?").setArgs(key).setSchema(schemaName))){for(List<?>row:cursor){Map<String,Object>map=newHashMap<>();for(inti=0;i<row.size();i){//列名作为keymap.put(cursor.getFieldName(i),row.get(i));}//使用jackson进行反序列化ObjectMappermapper=newObjectMapper();mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES,true);returnmapper.convertValue(map,Person.class);}}returnnull;}

发表评论

  • 评论列表
还没有人评论,快来抢沙发吧~