{"id":503,"date":"2025-11-25T17:10:51","date_gmt":"2025-11-25T17:10:51","guid":{"rendered":"https:\/\/guillaumesblog.net\/?p=503"},"modified":"2025-11-26T16:44:16","modified_gmt":"2025-11-26T16:44:16","slug":"how-do-companies-collect-data-and-act-on-it-almost-in-real-time","status":"publish","type":"post","link":"https:\/\/guillaumesblog.net\/index.php\/how-do-companies-collect-data-and-act-on-it-almost-in-real-time\/","title":{"rendered":"What software allows companies to react to inputs in real-time?"},"content":{"rendered":"\n<p class=\"wp-block-paragraph\">To illustrate how this works let&#8217;s take a look at two pervasive technologies: Spark and Kafka. These two have become hugely popular because of the need for real time data processing &#8211; for instance self driving cars, taxi sharing apps, fraud detection or any application where the user needs to be warned immediately. Let&#8217;s build a toy real time weather recording and alerting app and experiment together.<\/p>\n\n\n\n<!--more-->\n\n\n\n<p class=\"wp-block-paragraph\">Before we go in I would like to give a few observations on the big data ecosystem: It is convoluted, code heavy, and Opps heavy (its a distributed system), and as such it is not a beginner-friendly specialisation. That is one of the reason why several vendors, in my opinion, are trying to make this space more approachable for non system admins. The idea is to put the data at the hands of the &#8220;data people&#8221; without all that overhead you are going to get a taste for below.<\/p>\n\n\n\n<p class=\"wp-block-paragraph\">Also &#8211; and I know you have not asked for it but I insist &#8211; a history lesson that would give us some context: In the first half of the 2000&#8242; &#8220;webscale&#8221; companies like Google and Amazon had to make use of hundreds of terabytes or petabytes of data, and it was not an easy thing to do back then. That was and is <strong>the &#8220;raison d&#8217;etre&#8221; behind Hadoop, it is to <em>store<\/em> and <em>to do something <\/em>with that data, AND across a fleet of <em>commodity servers<\/em> that you could easily scale horizontally.<\/strong> Traditional databases would struggle with that, RDBMS are good to scale vertically but horizontally you would hit limitations and have to resort to pricey appliances or awkward technical solutions. Hadoop has its root with a publication called &#8220;Google File System&#8221;, and got open sourced later on. Hadoop is a toolbox with a myriad of utilities stacked upon each other, but is mostly composed of a <em>distributed<\/em> file system (HDFS) that spans storage across many machines (a bit vSAN or S3 like), an orchestrator (YARN) and a compute layer (MapReduce) with which you would do some <a href=\"https:\/\/en.wikipedia.org\/wiki\/MapReduce\">groupBy&#8217;s and filtering<\/a>. Spark is an evolution of MapReduce and it is both an application and a library of functions to call against you source data, either structured or semi-structured. Leaving aside the details one key element is the paradigm shift from the original &#8220;schema-on-read&#8221; versus &#8220;schema-on-write&#8221;, in other words you do not have to clean the data to query it with Spark, as opposed to what it is with a Datawarehouse. Big data is an umbrella term that goes beyond Hadoop, but is in essence mostly Hadoop.<\/p>\n\n\n\n<p class=\"wp-block-paragraph\">What we use in our exercise is Apache Spark Streaming, the streaming add-on of Spark, and we continuously process data from a Kafka-based data pipeline \u2014 no batching required. <\/p>\n\n\n\n<figure class=\"wp-block-image size-large\"><img loading=\"lazy\" decoding=\"async\" width=\"1024\" height=\"520\" src=\"https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/kafka-spark-blog.drawio-1-1024x520.png\" alt=\"\" class=\"wp-image-524\" srcset=\"https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/kafka-spark-blog.drawio-1-1024x520.png 1024w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/kafka-spark-blog.drawio-1-300x152.png 300w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/kafka-spark-blog.drawio-1-768x390.png 768w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/kafka-spark-blog.drawio-1-1536x779.png 1536w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/kafka-spark-blog.drawio-1-1200x609.png 1200w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/kafka-spark-blog.drawio-1.png 2012w\" sizes=\"auto, (max-width: 709px) 85vw, (max-width: 909px) 67vw, (max-width: 1362px) 62vw, 840px\" \/><\/figure>\n\n\n\n<p class=\"wp-block-paragraph\">From left to right &#8211; We pull data from <a href=\"https:\/\/open-meteo.com\/en\/docs?current=temperature_2m#15_minutely_parameter_definition\">OpenMeteo API<\/a> from the Edge computer (In real life I would have sensors, let&#8217;s say along a river, and they would shoot MQTTS messages to my edge computer, but I am not doing that here.). Then I send the sensor data to a Kafka cluster over TLS, so the message content remains encrypted (<em>edit: I should have used mTLS here to authenticate clients too, and certs are distributed to every ressource involved whether on OCI or not, so you should always do that<\/em>) and I obfuscate the Kafka broker with the use of an NLB. Kafka buffers the stream (and many other important things, such as persistence) on an encrypted disk, then distributes the data to any authorised consumer. Here the &#8220;consumer&#8221; is a Spark application that reads the stream topic &#8220;weather&#8221;, selects only some fields, and then does two things: send a real time alert and groups the temperatures per 15 minutes to aggregate the average, finally writes it out to Object storage. The data saved in Object Storage and can now be used for analysis using tools like AIDP, Databricks, Tableau, etc&#8230; It is worth noting that both Kafka and Spark can work as &#8220;clusters&#8221; and are designed for scale, and are fault tolerant &#8211; that another massive upside of them.<\/p>\n\n\n\n<p class=\"wp-block-paragraph\">The code for the Edge computer can be found <a href=\"https:\/\/github.com\/geddegda\/streaming_exploration\">here<\/a>. We create a Kafka producer, set-up the endpoint and enter an infinity while loop continuously fetching OpenMeteo API. We send out the responses to the Kafka broker, the Edge computer is what Kafka calls &#8220;a producer&#8221;.<\/p>\n\n\n\n<p class=\"wp-block-paragraph\">Then the NLB, nothing special here, just use the correct network security group (NSG) or the security list associated with the Subnet to secure it. My personal preference is to leave the security list alone, and to have a case by case NSG. The NLB forwards TCP to 9095 which is where my Kafka listens on a private IP on the private subnet, also heartchecks are on 9095. I assume you&#8217;d be familiar with the cloud.<\/p>\n\n\n\n<figure class=\"wp-block-image size-large\"><img loading=\"lazy\" decoding=\"async\" width=\"1024\" height=\"299\" src=\"https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/nlb-1024x299.png\" alt=\"\" class=\"wp-image-530\" srcset=\"https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/nlb-1024x299.png 1024w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/nlb-300x88.png 300w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/nlb-768x224.png 768w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/nlb-1200x351.png 1200w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/nlb.png 1461w\" sizes=\"auto, (max-width: 709px) 85vw, (max-width: 909px) 67vw, (max-width: 1362px) 62vw, 840px\" \/><\/figure>\n\n\n\n<p class=\"wp-block-paragraph\">On the Kafka machine, install the a JAVA compatible version and set JAVA_HOME. You can follow the <a href=\"https:\/\/kafka.apache.org\/quickstart\">quickstart<\/a> and create a &#8220;topic&#8221; &#8211; in our case I have a &#8220;weather&#8221; topic. Now, here is the gotcha: if you want your Kafka to listen to anything else but localhost, and if you want to set up TLS, you need to fiddle with the Kafka configuration file. The good news is that <em>truly yours<\/em> has made one <a href=\"https:\/\/github.com\/geddegda\/streaming_exploration\">available<\/a> for you. Also, you have to use the keytool<em><strong> <\/strong><\/em>command to generate wallets and keys, which is Openssl equivalent but for Java.<\/p>\n\n\n\n<p class=\"wp-block-paragraph\">The Spark machine now,<a href=\"https:\/\/spark.apache.org\/downloads.html\"> get Spark<\/a> and get the Spark Python API called <a href=\"https:\/\/pypi.org\/project\/pyspark\/\">PySpark<\/a>, execute <em>pip install pyspark<\/em> from your Python virtual environment. Now let&#8217;s have a look at my Spark <a href=\"https:\/\/github.com\/geddegda\/streaming_exploration\">code<\/a>.<\/p>\n\n\n\n<p class=\"wp-block-paragraph\">Let&#8217;s go through it together &#8211; so first we are creating a SparkSession which is the top level object that will contain some configuration on how to connect to the datasources (Kafka) and sinks (OCI Object storage) &#8211; I think the destination is called a &#8220;sink&#8221; in the big data world.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> spark = SparkSession.builder \\\n        .appName(\"KafkaBatchtoOCIBucket\") \\\n        .config(\"spark.jars.packages\", \"org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1\") \\\n        .config(\"spark.hadoop.fs.oci.client.hostname\", 'https:\/\/objectstorage.eu-marseille-1.oraclecloud.com')\\\n        .config(\"spark.hadoop.fs.oci.client.regionCodeOrId\", 'eu-marseille-1')\\\n        .config('spark.hadoop.fs.oci.client.auth.tenantId', 'ocid1.tenancy.oc1..')\\\n        .config('spark.hadoop.fs.oci.client.auth.userId', 'ocid1.user.oc1..')\\\n        .config('spark.hadoop.fs.oci.client.auth.fingerprint', 'ac:a3:6c:blablah:f9:b2:2b:3c') \\\n        .config('spark.hadoop.fs.oci.client.auth.pemfilepath', '\/home\/debian\/.ssh\/oci.key.pem') \\\n        .getOrCreate()<\/code><\/pre>\n\n\n\n<p class=\"wp-block-paragraph\">After that we are (re)defining the structure of the incoming Kafka data, remember on the Edge computer I am feeding Kafka with a JSON response coming from <a href=\"https:\/\/open-meteo.com\/en\/docs?current=temperature_2m#15_minutely_parameter_definition\">Open Meteo<\/a>? Well, now I have to tell Spark about the exact format to expect so it can reconstitute the messages. This is because Kafka gives out the JSON inside the key &#8220;value&#8221; in a byte format&#8230; It is quite annoying.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>schema = StructType(&#91;\n        StructField(\"latitude\", DoubleType()),\n        StructField(\"longitude\", DoubleType()),\n        StructField(\"generationtime_ms\", DoubleType()),\n        StructField(\"utc_offset_seconds\", LongType()),\n        StructField(\"timezone\", StringType()),\n        StructField(\"timezone_abbreviation\", StringType()),\n        StructField(\"elevation\", DoubleType()),\n        StructField(\"current_units\", StructType(&#91;\n            StructField(\"time\", StringType()),\n            StructField(\"interval\", LongType()),\n            StructField(\"temperature_2m\", StringType()),\n            ]),\n        ),\n        StructField(\"current\", StructType(&#91;\n            StructField(\"time\", StringType()),\n            StructField(\"interval\", LongType()),\n            StructField(\"temperature_2m\", DoubleType()),\n            ]),\n        )\n    ])<\/code><\/pre>\n\n\n\n<p class=\"wp-block-paragraph\">We create a blueprint for our Spark dataframes (<a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/python\/user_guide\/dataframes.html\">dataframes<\/a> are Spark bread and butter) made out of the Kafka messages, subscribe to the &#8220;weather&#8221; topic, and tell it not to fail if there is some data missing. There is some sort of a counter id for Kafka messages and if some are missing Spark will stop and throw an error. I think this had something to do with guarantee of delivery and consistency but I am unsure.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> df = spark.readStream.format(\"kafka\") \\\n        .option(\"kafka.bootstrap.servers\", \"10.150.1.5:9092\") \\\n        .option(\"subscribe\", \"weather\") \\\n        .option(\"startingOffsets\", \"earliest\") \\\n        .option(\"failOnDataLoss\", \"false\") \\\n        .load()<\/code><\/pre>\n\n\n\n<p class=\"wp-block-paragraph\">We apply some actions to the dataframe and we select only time and temperature, make sure that the time variable is of a timestamp type and then group the temperatures by blocks of 15 minutes to compute the average.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>value_df = df.select(from_json(col(\"value\").cast(\"string\"),schema).alias(\"value\"))\n    select_what_i_want_df = value_df.selectExpr('value.current.time as time', 'value.current.temperature_2m as temp')\n    make_sure_time_is_formatted_df = select_what_i_want_df.withColumn('time', to_timestamp('time'))\n    grouped_by_time_df = make_sure_time_is_formatted_df.withWatermark('time','15 minutes').groupBy(window('time','15 minutes')).agg(avg('temp'))<\/code><\/pre>\n\n\n\n<p class=\"wp-block-paragraph\">Now we are ready &#8211; we will initialise the first Spark query and read each batch of data.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>alerts_query = make_sure_time_is_formatted_df \\\n            .writeStream \\\n            .foreachBatch(create_alerts) \\\n            .outputMode(\"update\") \\\n            .start()<\/code><\/pre>\n\n\n\n<p class=\"wp-block-paragraph\">We call on the create_alerts function for each &#8220;volley&#8221;, which only selects the rows from the dataframe where the temp is &lt;5 and appends it onto a local file. (In real life you&#8217;d trigger an email, or send it to a Splunk\/Dynatrace like system, you would not write locally.)<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>def create_alerts(batch_df, batch_id):\n    alerts = batch_df.filter(col('temp') &lt; 5)\n    rows = alerts.collect()\n    with open(\"alerts.txt\", \"a\", encoding=\"utf-8\") as f:\n        for r in rows:\n            f.write(f\"Alert on {r&#91;'time']} with temperature {r&#91;'temp']}..\\n\")<\/code><\/pre>\n\n\n\n<p class=\"wp-block-paragraph\">Our real time results are now available, check this out:<\/p>\n\n\n\n<figure class=\"wp-block-image size-full\"><img loading=\"lazy\" decoding=\"async\" width=\"645\" height=\"141\" src=\"https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/sort.png\" alt=\"\" class=\"wp-image-547\" srcset=\"https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/sort.png 645w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/sort-300x66.png 300w\" sizes=\"auto, (max-width: 709px) 85vw, (max-width: 909px) 67vw, (max-width: 984px) 61vw, (max-width: 1362px) 45vw, 600px\" \/><\/figure>\n\n\n\n<p class=\"wp-block-paragraph\">In the meantime we initialise the second query and store the aggregations out to Object storage for long term compliance and further analysis. We download the <a href=\"https:\/\/github.com\/oracle\/oci-hdfs-connector?tab=readme-ov-file\">HDFS Connector for Object Storage<\/a> and copy oci-hdfs\/lib onto the jars folder of our Spark HOME directory; basically adding new functionality to Spark so it can connect to OCI Object storage. <\/p>\n\n\n\n<p class=\"wp-block-paragraph\">Also note that I&#8217;d like the output to write in parquet format but for some reason Spark won&#8217;t let me due to some errors in my config&#8230; JSON is a solution that works though&#8230; The reason why I am so keen about parquet is because I would like to sort the parquet files later on with Iceberg, but I leave it here for now.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>    query = grouped_by_time_df.writeStream.outputMode(\"append\") \\\n            .format(\"json\") \\\n            .option(\"path\", \"oci:\/\/bucket@randomtenancy\/records\/results\") \\\n            .option(\"checkpointLocation\", \"oci:\/\/bucket@randomtenancy\/checkpoints\") \\\n            .trigger(processingTime = '5 minutes')\\\n            .start()<\/code><\/pre>\n\n\n\n<p class=\"wp-block-paragraph\">And there you have it: real-time alerts are entered into the system, and historical data is output every 5 minutes into a bucket. We could have aligned the outputs generated with the &#8220;groupBy&#8221; windows to 15 minutes &#8211; that would make more sense &#8211; but just wanted to show here some Spark behavior, when it does not detect any change or new data it will write some empty 0 bytes placeholder objects instead. Note the &#8220;checkpoint location&#8221; field, I *think* this is related to fault tolerance, if Spark crashes it knows from where it can resume.<\/p>\n\n\n\n<figure class=\"wp-block-image size-large\"><img loading=\"lazy\" decoding=\"async\" width=\"1024\" height=\"496\" src=\"https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/object_storage-1024x496.png\" alt=\"\" class=\"wp-image-537\" srcset=\"https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/object_storage-1024x496.png 1024w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/object_storage-300x145.png 300w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/object_storage-768x372.png 768w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/object_storage-1536x744.png 1536w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/object_storage-1200x581.png 1200w, https:\/\/guillaumesblog.net\/wp-content\/uploads\/2025\/11\/object_storage.png 1879w\" sizes=\"auto, (max-width: 709px) 85vw, (max-width: 909px) 67vw, (max-width: 1362px) 62vw, 840px\" \/><\/figure>\n\n\n\n<p class=\"wp-block-paragraph\">This conclude our journey for now! We react to almost real-time data, data that is important for us is nearly instantaneously available and ready to be processed at scale, and that process is fault tolerant. In addition, it is also running in a while loop fashion so in a sense it is continuously listening for new data. Kafka and Spark do all that heavy lifting for us in a few lines of codes &#8211; isn&#8217;t an impressive piece of software?<\/p>\n\n\n\n<p class=\"wp-block-paragraph\">This is the rough principle that enables Ride share locations, or alerts if a dodgy credit card transaction happens.<\/p>\n\n\n\n<p class=\"wp-block-paragraph\">Hope this was useful, see you soon!<\/p>\n","protected":false},"excerpt":{"rendered":"<p>To illustrate how this works let&#8217;s take a look at two pervasive technologies: Spark and Kafka. These two have become hugely popular because of the need for real time data processing &#8211; for instance self driving cars, taxi sharing apps, fraud detection or any application where the user needs to be warned immediately. Let&#8217;s build &hellip; <a href=\"https:\/\/guillaumesblog.net\/index.php\/how-do-companies-collect-data-and-act-on-it-almost-in-real-time\/\" class=\"more-link\">Continue reading<span class=\"screen-reader-text\"> &#8220;What software allows companies to react to inputs in real-time?&#8221;<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-503","post","type-post","status-publish","format-standard","hentry","category-conversation"],"_links":{"self":[{"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/posts\/503","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/comments?post=503"}],"version-history":[{"count":52,"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/posts\/503\/revisions"}],"predecessor-version":[{"id":567,"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/posts\/503\/revisions\/567"}],"wp:attachment":[{"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/media?parent=503"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/categories?post=503"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/guillaumesblog.net\/index.php\/wp-json\/wp\/v2\/tags?post=503"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}