Streaming and Batch Unification – Data Sources and Ingestion

Streaming, batching, and querying data are all very performant and reliable activities when using delta tables. This means if your data analytics solution requires streaming or batching, the approach for ingesting, transforming, and serving is the same.

Time Travel

When you create a delta table, which is the default on Azure Databricks from version 8.0, any table you create will result in the table being stored in the Hive metastore with the added advancements. To confirm which version of the Azure Databricks runtime you are on, execute the following snippet:

spark.conf.get(“spark.databricks.clusterUsageTags.sparkVersion”)

Having a delta table provides version control, which allows you to query past versions of the data. This is a very useful capability, because it gives you access to several historical versions of your data useful for analyzing changes over time. Additionally, if the queries worked on previous versions of the data but are for some reason no longer working, you can get them working by pointing them to an older version. Then, once they are working, you can debug the queries and get them working on the current data as quickly as possible. You can determine the different available versions of the table by using the following SQL command, which is followed by its output:

DESCRIBE HISTORY default.BRAINWAVES;
+———+——————————+——–+———-+———–+—–+
| version | timestamp                    | userId | userName | operation |   … |
+———+——————————+——–+———-+———–+—–+
| 1       | 2022-03-15T09:06:05.000+0000 | 33515  | benperk  | WRITE     |   … |
| 0       | 2022-03-15T09:02:13.000+0000 | 33515  | benperk  | WRITE     |   … |
+———+——————————+——–+———-+———–+—–+

Using the timestamp as a parameter of your query, you can target the execution of the command on that version of the data, as follows:

SELECT COUNT(1) FROM default.BRAINWAVES
TIMESTAMP AS OF ‘2022-03-15T09:06:05.000+0000’;

You can achieve the same thing using Python with the following version of the versionAsOf option:

brainwaves = spark.read.format(“delta”) \ .option(“versionAsOf”, 0).load(“/FileStore/data/2022/03/14”)print(brainwaves.count())

Both the timestamp and version query options are available in all languages supported by Apache Spark.

By passing the version value shown in the aforementioned results of the DESCRIBE HISTORY command (in this example, a 0), you can identify the version on which the count()method is executed. If you change the 0 to 1, you will get different results, as expected. This capability not only gives you the opportunity to debug, but you can also use this technique to roll back to a previous version or create a new, permanent table with a version of the old data. It is important to note that the history is not stored indefinitely; therefore, if you need a copy of a previous version, you need to make that copy permanent by inserting the historical data into another delta table.

Write a Comment

Your email address will not be published. Required fields are marked *