前言 使用kafkaStream进行流式计算时,如果需要对数据进行状态处理,那么常用的会遇到kafkaStream的store,而store也有Local Store以及Global Store,当然也可以使用其他方案的来进行状态保存,文本主要理清楚kafkaStream中的Local Store以及Global Store之间的区别和用法,以及什么时候选择何种store和当store无法满足我们需求时,应该如何使用其他方案来进行数据的状态保存
本文所有方法和代码皆只针对kafka-streams的3.7.0版本,pom如下:
1 2 3 4 5 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-streams</artifactId > <version > 3.7.0</version > </dependency >
由于不同版本的KafkaStream在使用上有较大区别,也因为KafkaStream不同版本API改动较大,所以如果版本不一致,使用方法甚至是一些核心概念都会跟本文讲述有所出入,并且KafkaStream由于相对小众,文档也很少,官网的文档也只是一些简单介绍,所以需要注意避坑
Local Store和Global Store的共同点和区别点 共同点: 1、都是用于流式计算中进行状态存储的
2、具体结构类似,使用的都是如:KeyValueStore,SessionStore等类
3、实际机制类似,会通过内存、本地目录和kafka Topic的变更记录等方式来进行缓存数据更新和恢复
不同点 1、适用场景不同
Local Store 适合用于单个实例的状态管理,适合处理单个分区的数据,并且缓存数据不会多个实例共享
Global Store 适用于跨实例共享数据状态,多个实例通过Topic中的更新记录来跟新进程中的数据
2、使用方法不同
Local Store 可以直接在代码中调用对应类型存储(如:KeyValueStore)的put方法进行更新数据,不需要考虑数据一致性(因为可见性只有单个实例)
Global Store 不能直接调用对应的put和delete方法,所有更新和删除缓存都需要通过发送数据到Global 配置的topic中,然后自行实现Topic数据消费者(实现:org.apache.kafka.streams.processor.api.Processor类),在消费者类中进行数据更新等操作,同时因为需要自己实现更新实例中的数据逻辑,数据一致性也需要开发者自行处理,虽然正常来说利用Kafka本身的特性很少出现数据一致性问题,但是如果多实例之间性能差异和网络环境等差异,容易将数据不一致的时长延长,如果要求Store一致性强且容忍数据不一致时限短,则需要注意考虑Store更新数据消费者的处理能力
3、扩展性
Local Store :可以通过增加输入主题的分区数来扩展处理能力,但每个实例仍然独立运行。
Global Store :需要在多个实例之间共享状态,因此在设计时需要考虑如何高效地管理和同步状态。
常见的Store 类型
1 2 3 4 5 org.apache.kafka.streams.state.KeyValueStore org.apache.kafka.streams.state.SessionStore org.apache.kafka.streams.state.TimestampedKeyValueStore org.apache.kafka.streams.state.VersionedKeyValueStore org.apache.kafka.streams.state.WindowStore
需要根据实际使用场景选择合适的状态存储类
用法 Local Store 第一步,先生成对应类型的StoreBuilder对象,如我需要用KeyValueStore,然后状态存储的名字是:testLocalStore(这个名字不能重复,因为会根据消费者id加储存名称创建对应的Topic,当然如果是不同的KafkaStream程序,消费者id不一致,那么重复就没有关系了),因为是KeyValue类型的储存,所以需要设定对应的Key和Value数据的序列化对象,具体代码如下:
1 StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore" ), Serdes.String(), Serdes.String());
其中Stores.persistentKeyValueStore代表的我得存储是持久化的,正常都是会用持久化,当然也有存储一些不重要或者程序重启丢失也无所谓的状态数据,可以使用Stores.inMemoryKeyValueStore以及基于LRU淘汰机制的储存Stores.lruMap,第二个参数Serdes.String()代表存储数据的key是字符串,第三个参数同理,如果是要存储一些对象,也可以使用自定义的序列化类,实现
1 org.apache.kafka.common.serialization.Serializer
序列化类,以及反序列化类
1 org.apache.kafka.common.serialization.Deserializer
然后定义好即可,如:
1 2 new Serdes .WrapperSerde<>(new KryoSerializer <>(TestStoreBean.class), new KryoDeserializer <>(TestStoreBean.class)
其中KryoSerializer和KryoDeserializer是我自定义的使用Kryo序列化Java对象的类,TestStoreBean是我保存的状态的数据封装bean
KryoSerializer代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import com.esotericsoftware.kryo.Kryo;import com.esotericsoftware.kryo.io.Output;import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;public class KryoSerializer <T> implements Serializer <T> { private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal <Kryo>() { @Override protected Kryo initialValue () { Kryo kryo = new Kryo (); kryo.setReferences(true ); kryo.setRegistrationRequired(false ); return kryo; } }; public static Kryo getInstance () { return KRYO_LOCAL.get(); } private Class<T> clz; public KryoSerializer (Class<T> clz) { this .clz = clz; } @Override public byte [] serialize(String s, T t) { if (t == null ){ return null ; } ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream (); Output output = new Output (byteArrayOutputStream); Kryo kryo = getInstance(); kryo.writeObjectOrNull(output, t,clz); output.flush(); return byteArrayOutputStream.toByteArray(); } }
KryoDeserializer代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import com.esotericsoftware.kryo.Kryo;import com.esotericsoftware.kryo.io.Input;import org.apache.kafka.common.serialization.Deserializer;import java.io.ByteArrayInputStream;public class KryoDeserializer <T> implements Deserializer <T> { private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal <Kryo>() { @Override protected Kryo initialValue () { Kryo kryo = new Kryo (); kryo.setReferences(true ); kryo.setRegistrationRequired(false ); return kryo; } }; public static Kryo getInstance () { return KRYO_LOCAL.get(); } private Class<T> clz; public KryoDeserializer (Class<T> clz) { this .clz = clz; } @Override public T deserialize (String s, byte [] bytes) { if (bytes == null ){ return null ; } ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream (bytes); Input input = new Input (byteArrayInputStream); Kryo kryo = getInstance(); try { return kryo.readObjectOrNull(input, clz); }catch (Exception e){ e.printStackTrace(); } return null ; } }
同理,使用LocalStore时,可以将代码替换成以下内容:
1 2 StoreBuilder<KeyValueStore<String, TestStoreBean>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore" ), Serdes.String(), new Serdes .WrapperSerde<>(new KryoSerializer <>(TestStoreBean.class), new KryoDeserializer <>(TestStoreBean.class));
有了StoreBuilder对象之后,直接在StreamsBuilder对象中添加即可
1 streamsBuilder.addStateStore(kvBuilder);
需要使用时,先在处理数据的Processor类中的init方法获取对应的状态存储对象
1 this .testLocalStore = context.getStateStore("testLocalStore" );
然后就可以在process方法中调用testLocalStore的get、put、delete等方法操作状态存储数据了,具体代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j public static class StreamProcessor implements Processor <String,String,String,String> { private KeyValueStore<String,String> testLocalStore; private ProcessorContext context; private String toTopic; @Override public void init (ProcessorContext context) { this .context = context; this .testLocalStore = context.getStateStore("testLocalStore" ); } public StreamProcessor (String toTopic) { this .toTopic = toTopic; } @Override public void process (Record<String, String> record) { testLocalStore.put("key1" ,"testValue1" ); log.info("testLocalStore key1 : {}" ,testLocalStore.get("key1" )); testLocalStore.delete("key1" ); context.forward(record,toTopic); } }
其中实现的Processor类全称是:org.apache.kafka.streams.processor.api.Processor,上面代码只是在数据处理流程中简单保存了数据,然后获取出来以及删除,没有对流数据做任何处理,就直接发送到输出的topic了
完整代码如下:
1 2 3 4 5 6 7 8 9 10 @Bean public KStream<String,String> kStreamTestStore (StreamsBuilder streamsBuilder) { log.info("init kStreamTestStore" ); StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore" ), Serdes.String(), Serdes.String()); streamsBuilder.addStateStore(kvBuilder); KStream<String, String> stream = streamsBuilder.stream(fromTopic); stream.process(()->new StreamProcessor (toTopic), Named.as(fromTopic),"testLocalStore" ); streamsBuilder.build().addSink(toTopic,toTopic,fromTopic); return stream; }
注意:由于使用Store需要通过ProcessorContext对象来获取Store对象,所以在KafkaStream常用的一些map,mapValue,flatMapValues这些流式计算方法中是没办法使用的,只能在一些更底层的Api中去使用,如process
Global Store 同Local Store一样,需要先生成对应类型的StoreBuilder对象,代码跟Local Store一样
1 StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore" ), Serdes.String(), Serdes.String());
然后定义处理状态更新日志的Processor类,在这个类中,可以对缓存数据进行更新和删除操作(其他地方都是不能直接修改Global Store的)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class GlobalStoreHandleProcessor <K, V> implements Processor <K,V,Void,Void> { private KeyValueStore<K, V> store; private String storeName; public GlobalStoreHandleProcessor (String storeName) { this .storeName = storeName; } @Override public void process (Record<K,V> record) { if (record == null || record.value() == null ) { return ; } store.put(record.key(), record.value()); } @Override public void init (ProcessorContext context) { this .store = context.getStateStore(storeName); } }
跟KafkaStream的process是一样的,只需要在process方法中对缓存进行更新或者删除操作即可,我这里只是简单put操作,具体逻辑可以根据自己情况进行处理
在StreamsBuilder对象中添加StoreBuilder对象
1 2 streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore" , Consumed.with(Serdes.String(),Serdes.String()), ()->new GlobalStoreHandleProcessor <>("testGlobalStore" ));
其中第二个参数testGlobalStore是Global Store绑定的数据变更记录的Topic,如果要更新,则需要通过向这个topic发送数据来进行更新Global Store中的数据
处理数据的Processor类实例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static class StreamProcessor implements Processor <String,String,String,String> { private KeyValueStore<String,String> testGlobalStore; private ProcessorContext context; private String toTopic; @Override public void init (ProcessorContext context) { this .context = context; this .testGlobalStore = context.getStateStore("testGlobalStore" ); } public StreamProcessor (String toTopic) { this .toTopic = toTopic; } @Override public void process (Record<String, String> record) { testLocalStore.put(jsonObject.getString("key" ),jsonObject.getString("value" )); log.info("testLocalStore key1 : {}" ,testGlobalStore.get("key1" )); context.forward(new Record ("testGlobalKey" ,"global value" ,record.timestamp()),"testGlobalStore" ); context.forward(record,toTopic); } }
与Local Store不同的是,不能在处理数据流的时候,对缓存进行put操作,只能通过将数据发送到Global Store关联的topic中,在GlobalStoreHandleProcessor中去做更新
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 @Bean public KStream<String,String> kStreamTestStore (StreamsBuilder streamsBuilder) { log.info("init kStreamTestStore" ); StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore" ), Serdes.String(), Serdes.String()); streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore" , Consumed.with(Serdes.String(),Serdes.String()), ()->new GlobalStoreHandleProcessor <>("testGlobalStore" )); KStream<String, String> stream = streamsBuilder.stream(fromTopic); stream.process(()->new StreamProcessor (toTopic), Named.as(fromTopic)); streamsBuilder.build().addSink(toTopic,toTopic,fromTopic); streamsBuilder.build().addSink("testGlobalStore" ,"testGlobalStore" ,fromTopic); return stream; }
与Local Store不同点在于,不需要在process方法中添加store的名字,但是因为要从process方法中直接将更新Store的数据发送到topic,所以需要添加一个Global Store绑定的Topic的输出扩展,也就是下面这行代码
1 streamsBuilder.build().addSink("testGlobalStore" ,"testGlobalStore" ,fromTopic);
不适合的场景 由于KafkaStream Store 没有自动过期数据和过期数据自动删除的概率(可能是有,但是我没有找到对应文档),所以如果我们存储的key集合特别大,并且需要自动过期和自动删除,那么就不适合使用Store来处理了,因为需要我们自行处理删除逻辑,尤其是有些场景中,并不会对过期的key进行访问,所以采用惰性删除基本上不现实,但是定时删除,因为Store会存储到磁盘,如果存储的key很多,删除对应数据的时候耗时很长,尤其是单次删除大量key的时候,可能会直接超时,并且还必须要自己处理定时删除的逻辑,想要更好的去删除,就需要大量时间去开发和优化。
虽然使用内存的Store能稍微好点,但是毕竟单个进程内存有限,并且正常流处理中,如果需要保存状态,那么肯定是希望进程重启之后,能恢复数据,避免计算出错的,所以如果是有大量不重复key,并且数据需要到期自动删除的话,可以直接使用Redis做状态存储,并且进过我得实际测试,使用Redis并不比Store慢,并且在key量越来越大的情况下,Redis的性能是完全优于Store的(只针对持久化的Store),当然使用Redis,还是会更使用Global Store一样,需要考虑数据一致性的问题,不过这个问题可以通过将相同key的数据从Kafka Topic就分配到同一个Topic分区中来避免