scala - Creating an actorpublisher and actorsubscriber with same actor -


i'm newbie akka streams. i'm using kafka source(using reactivekafka library) , processing of data through flow , using subscriber(eshandler) sink.

now need handle errors , push different kafka queue through error handler. i'm trying use eshandler both publisher , subscriber. i'm not sure how include eshandler middle man instead of sink.

this code:

val publisher = kafka.kafka.consume(topic, "es", new stringdecoder())  val flow = flow[string].map { elem => jsonconverter.convert(elem.tostring()) }  val sink = sink.actorsubscriber[genmodel](props(classof[eshandler]))  source(publisher).via(flow).to(sink).run()   class eshandler extends actorsubscriber actorpublisher[model] {    val requeststrategy = watermarkrequeststrategy(100)    def receive = {     case onnext(msg: model) =>       context.actorof(props(classof[esstorage], self)) ! msg      case onerror(err: exception) =>       context.stop(self)      case oncomplete =>       context.stop(self)      case response(msg) =>       if (msg.iserror()) onnext(msg.getcontent())   } }  class errorhandler extends actorsubscriber {    val requeststrategy = watermarkrequeststrategy(100)    def receive = {     case onnext(msg: model) =>       println(msg)   } } 

we highly recommend against implementing own processor (which name reactive streams spec gives "subscriber && publisher". pretty hard right, why there's not publisher exposed directly helper trait.

instead, of time you'll want use sources/sinks (or publishers/subscribers) provided , run operations between those, map/filter etc. steps.

in fact, there existing implementation kafka sources , sinks can use, it's called reactive-kafka , verified reactive streams tck, can trust valid implementations.


Comments

Popular posts from this blog

How to provide Authorization & Authentication using Asp.net, C#? -

toolbar - How to add link to user registration inside toobar in admin joomla 3 custom component -

How to use Authorization & Authentication in Asp.net, C#? -