How can I update a broadcast variable in spark streaming ?

How can I update a broadcast variable in spark streaming ?

Asked on December 28, 2018 in Apache-spark.
Add Comment


  • 2 Answer(s)

    Here BroadcastWrapper is used in the below code that refresh broadcast variable based on some ttl

    public class BroadcastWrapper {
     
        private Broadcast<ReferenceData> broadcastVar;
        private Date lastUpdatedAt = Calendar.getInstance().getTime();
     
        private static BroadcastWrapper obj = new BroadcastWrapper();
     
        private BroadcastWrapper(){}
       
        public static BroadcastWrapper getInstance() {
            return obj;
        }
        public JavaSparkContext getSparkContext(SparkContext sc) {
            JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
            return jsc;
        }
     
        public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
            Date currentDate = Calendar.getInstance().getTime();
            long diff = currentDate.getTime()-lastUpdatedAt.getTime();
            if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
                if (var != null)
                    var.unpersist();
                lastUpdatedAt = new Date(System.currentTimeMillis());
     
                //Your logic to refresh
                ReferenceData data = getRefData();
     
                var = getSparkContext(sparkContext).broadcast(data);
            }
            return var;
        }
    }
    

    Here the code will be as :

    public void startSparkEngine() {
     
        final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
            Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());
            stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
        });
     
        filteredStream.foreachRDD(rdd -> {
            rdd.foreach(obj -> {
            // Final processing of filtered objects
            });
            return null;
        });
    }
    

    Even multi-cluster is also working. And it will be helpful.

    Answered on December 28, 2018.
    Add Comment

    Here the streaming applications need a way to weave (filter, lookup etc) reference data (from DB, files etc) into the streaming data.

    These are the streaming operations were Lookup reference data to be used

    • First create CacheLookup object with desired cache TTL
    • Then wrap that in Broadcast
    • And CacheLookup is used as part of streaming logic

    Mostly this works fine, except for the following

    Updating the reference data

    Here there is no other way for completing despite the suggestions in these threads, because this kill the previous broadcast variable and create new one. Multiple unknowns like what to be expected between these operations.

    In this it needs very commonly and it would have helped if there is a way to send info to broadcast variable informing update.

    And with help of it is possible to invalidate the local caches in “CacheLookup”

    Here the next portion of the issue is still not solved. It would be interested if there is any viable approach to this

    Answered on December 28, 2018.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.