PySpark – Search Table in Spark Database

  • Post author:
  • Post last modified:June 28, 2021
  • Post category:Apache Spark
  • Reading time:6 mins read

In a real world scenario, you will be dealing with petabytes of data and thousands of tables in a hundred of databases within Spark or Hive catalog. It is practically, time consuming to identify the particular table in a database, hence it is always good idea to develop reusable code that you can use to search table(s) in a given database. The usability includes safely dropping table(s) and identify table structure. In this article, we will learn how to search table in a database using PySpark.

Search Table in Database using PySpark

Spark stores the details about database objects such as tables, functions, temp tables, views, etc in the Spark SQL Metadata Catalog. If you are coming from relational databases such as MySQL, you can consider it as a data dictionary or metadata.

Spark provides many Spark catalog API’s. You can use those API’s to get information such as table details from the catalog. Section at the end of the article will list the different Spark catalog API’s supported in PySpark.

List All Tables in a Database using PySpark Catalog API

Consider following example that uses spark.catalog.listTables() PySpark API to list all tables present in current database.

>>> for table in spark.catalog.listTables():
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()

for table in spark.catalog.listTables():
	print(table.name)

Following is the output.

table1
table2
table3

Check if Table Exists in Database using PySpark Catalog API

Following example is a slightly modified version of above example to identify the particular table in a database.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
	
if len([(i) for i in spark.catalog.listTables() if i.name=="table1"]) != 0:
	print("Table Found!!")

Following is the output.

Table Found!!

Python UDF to Check if Table Exists in Database using PySpark

You can write your own UDF to search table in the database using PySpark. Following is the complete UDF that will search table in a database.

def search_object(database, table):
	if len([(i) for i in spark.catalog.listTables(database) if i.name==str(table)]) != 0:
		return True
	return False	

and following is the output.

# Returns True if table present. Otherwise, False
>>> search_object('test_db', 'table1')
True

>>> search_object('test_db', 'table10')
False

PySpark Catalog API

Not all Spark catalog API’s are supported in PySpark. Following is the list of Spark catalog API’s supported in PySpark.

Spark Catalog API in PySparkDescription
currentDatabase()This API returns the current default database in this session
setCurrentDatabase()You can use this API to sets the current default database in this session.
listDatabases()Returns a list of databases available across all sessions
listTables(dbName=None)Returns a list of tables/views in the specified database. API uses current database if no database is provided.
listFunctions(dbName=None)Returns a list of functions registered in the specified database. API uses current database if no database is provided.
listColumns(tableName, dbName=None)Returns a list of columns for the given table/view in the specified database.API uses current database if no database is provided.
createTable(tableName, path=None, source=None,  schema=None,  **options)Creates a table based on the dataset in a data source and returns the DataFrame associated with the table.
dropGlobalTempView(viewName)Drops the global temporary view with the given view name in the catalog. If the view has been cached before, then it will also be uncached. Returns true if this view is dropped successfully, false otherwise.
dropTempView(viewName)Drops the local temporary view with the given view name in the catalog. If the view has been cached before, then it will also be uncached. Returns true if this view is dropped successfully, false otherwise.
isCached(tableName)Returns true if the table is currently cached in-memory.
recoverPartitions(tableName)Recovers all the partitions of the given table and update the catalog. Only works with a partitioned table, and not a view.
refreshByPath(path)Invalidates and refreshes all the cached data for any DataFrame that contains the given data source path.
refreshTable(tableName)Invalidates and refreshes all the cached data and metadata of the given table.
cacheTable(tableName)Caches the specified table in-memory.
isCached(tableName)Returns true if the table is currently cached in-memory.
uncacheTable(tableName)Removes the specified table from the in-memory cache.
clearCache()Removes all cached tables from the in-memory cache.

Related Articles,

Hope this helps 🙂