scala - Creating an actorpublisher and actorsubscriber with same actor -
i'm newbie akka streams. i'm using kafka source(using reactivekafka library) , processing of data through flow , using subscriber(eshandler) sink.
now need handle errors , push different kafka queue through error handler. i'm trying use eshandler both publisher , subscriber. i'm not sure how include eshandler middle man instead of sink.
this code:
val publisher = kafka.kafka.consume(topic, "es", new stringdecoder()) val flow = flow[string].map { elem => jsonconverter.convert(elem.tostring()) } val sink = sink.actorsubscriber[genmodel](props(classof[eshandler])) source(publisher).via(flow).to(sink).run() class eshandler extends actorsubscriber actorpublisher[model] { val requeststrategy = watermarkrequeststrategy(100) def receive = { case onnext(msg: model) => context.actorof(props(classof[esstorage], self)) ! msg case onerror(err: exception) => context.stop(self) case oncomplete => context.stop(self) case response(msg) => if (msg.iserror()) onnext(msg.getcontent()) } } class errorhandler extends actorsubscriber { val requeststrategy = watermarkrequeststrategy(100) def receive = { case onnext(msg: model) => println(msg) } }
we highly recommend against implementing own processor (which name reactive streams spec gives "subscriber && publisher". pretty hard right, why there's not publisher exposed directly helper trait.
instead, of time you'll want use sources/sinks (or publishers/subscribers) provided , run operations between those, map/filter etc. steps.
in fact, there existing implementation kafka sources , sinks can use, it's called reactive-kafka , verified reactive streams tck, can trust valid implementations.
Comments
Post a Comment