Spark SQL/Hive.. - Interview questions for Big Data engineers

Spark SQL/Hive.. - Interview questions for Big Data engineers

Note : Gave an attempt to cover some of the use cases/concepts here. Will keep on adding in the future. Please dont forget to share/like this article if this effort is worth.

These are the some of the commonly asked questions in SQL

1.Question : Select Nth maximum salary from a Table :

Write a query to select Nth maximum salary from emp_dept_tbl (or) Write a query to find 2nd, 3rd max salary from EMP table (or) Write a query to find 10 highest salary (or) Write a query to find 4th highest salary (without/with analytical function)

We can achieve this by using the correlated sub query. In the below example we are getting the 5th highest salary without using the Analytical function.

select * from emp emp1
where (5-1) =
(
select count(distinct(emp2.sal)) from  emp_dept_tbl  emp2
where emp2.sal > emp1.sal
) 
        

In the below example we are getting the 5th highest salary by using the Analytical function.

select * from 
         ( 
  select e.*, DENSE_RANK() over (order by salary DESC) RN  from emp_dept_tbl e
         )
where   RN=5         

2. Question : Select maximum N salaries from EMP Table Select top N salaries from each Department of EMP table

Answer : Write a query to select top N salaries from each department of the emp_dept_tbl table (or) Write a query to select maximum N salaries from each department of the EMP table

We can achieve this by using the DENSE_RANK Analytical function. In the below example we are getting the TOP 3 salaries for each department of the EMP table.

select * from 
( select e.*,DENSE_RANK() over (partition by department order by salary DESC) RN 
from emp_dept_tbl e ) 
where RN <=3         

Note : Below is spark code for 1st and 2nd questions to understand the query in better way

 val csvData: Dataset[String] = spark.sparkContext.parallelize(
    """
      |ID,FIRST_NAME,LAST_NAME,DESIGNATION,DEPARTMENT,SALARY
      |1001,Ram,Ghadiyaram,Director of Sales,Sales,30000
      |1002,Ravi,Rangasamy,Marketing Manager,Sales,25000
      |1003,Ramesh, Rangasamy,Assistant Manager,Sales,25000
      |1004,Prem,Sure,Account Coordinator,Account,15000
      |1005,Phani ,G,Accountant II,Account,20000
      |1006,Krishna,G,Account Coordinator,Account,15000
      |1007,Rakesh,Krishnamurthy,Assistant Manager,Sales,25000
      |1008,Gally,Johnson,Manager,Account,28000
      |1009,Richard,Grill,Account Coordinator,Account,12000
      |1010,Sofia,Ketty,Sales Coordinator,Sales,20000
      |""".stripMargin.lines.toList).toDS()
  val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
  frame.show()
  frame.createOrReplaceTempView("emp_dept_tbl")
 


println("5th largest salary")


spark.sql ("""
select * from 
         ( select e.*, DENSE_RANK() over (order by salary DESC) RN 
from emp_dept_tbl e )
where RN=5 
""".stripMargin).show


println("TOP 3 salaries for each department of the emp_dept_tbl table using analytical function ")
spark.sql("""
 select * from 
( select e.*, DENSE_RANK() over (partition by department order by salary DESC) RN  from emp_dept_tbl e ) 
where RN <=3 
""").show
        

Result :

+----+----------+-------------+-------------------+----------+------+
|  ID|FIRST_NAME|    LAST_NAME|        DESIGNATION|DEPARTMENT|SALARY|
+----+----------+-------------+-------------------+----------+------+
|1001|       Ram|   Ghadiyaram|  Director of Sales|     Sales| 30000|
|1002|      Ravi|    Rangasamy|  Marketing Manager|     Sales| 25000|
|1003|    Ramesh|    Rangasamy|  Assistant Manager|     Sales| 25000|
|1004|      Prem|         Sure|Account Coordinator|   Account| 15000|
|1005|    Phani |            G|      Accountant II|   Account| 20000|
|1006|   Krishna|            G|Account Coordinator|   Account| 15000|
|1007|    Rakesh|Krishnamurthy|  Assistant Manager|     Sales| 25000|
|1008|     Gally|      Johnson|            Manager|   Account| 28000|
|1009|   Richard|        Grill|Account Coordinator|   Account| 12000|
|1010|     Sofia|        Ketty|  Sales Coordinator|     Sales| 20000|
+----+----------+-------------+-------------------+----------+------+

5th largest salary
+----+----------+---------+-------------------+----------+------+---+
|  ID|FIRST_NAME|LAST_NAME|        DESIGNATION|DEPARTMENT|SALARY| RN|
+----+----------+---------+-------------------+----------+------+---+
|1004|      Prem|     Sure|Account Coordinator|   Account| 15000|  5|
|1006|   Krishna|        G|Account Coordinator|   Account| 15000|  5|
+----+----------+---------+-------------------+----------+------+---+

TOP 3 salaries for each department of the emp_dept_tbl table using analytical function 
+----+----------+-------------+-------------------+----------+------+---+
|  ID|FIRST_NAME|    LAST_NAME|        DESIGNATION|DEPARTMENT|SALARY| RN|
+----+----------+-------------+-------------------+----------+------+---+
|1008|     Gally|      Johnson|            Manager|   Account| 28000|  1|
|1005|    Phani |            G|      Accountant II|   Account| 20000|  2|
|1004|      Prem|         Sure|Account Coordinator|   Account| 15000|  3|
|1006|   Krishna|            G|Account Coordinator|   Account| 15000|  3|
|1001|       Ram|   Ghadiyaram|  Director of Sales|     Sales| 30000|  1|
|1002|      Ravi|    Rangasamy|  Marketing Manager|     Sales| 25000|  2|
|1003|    Ramesh|    Rangasamy|  Assistant Manager|     Sales| 25000|  2|
|1007|    Rakesh|Krishnamurthy|  Assistant Manager|     Sales| 25000|  2|
|1010|     Sofia|        Ketty|  Sales Coordinator|     Sales| 20000|  3|
+----+----------+-------------+-------------------+----------+------+---+

        

3. Question : Select/Delete duplicate rows from EMP table (oracle) : Write a query to select/delete duplicate rows from the EMP table

Answer: We can achieve this by using the pseudo column ROWID.

select * from emp where rowid not in
(
select min(rowid) from emp group by empno
);

delete from emp
where rowid not in
(
select min(rowid) from emp group by empno
);        

4. Question : Same Salary - Write a query to select only those employee information who are earning the same salary?

Answer:

We can achieve this in at least 3 ways...

Method 1:

select e1.* 
from emp e1,emp e2 
where e1.sal=e2.sal
and e1.ename <> e2.ename         

Method 2 :

select * from emp
 where sal in
(select sal from emp
  group by sal
having count(sal)>=2 ) 
        

Method 3:

SELECT *
FROM
(
SELECT e.*, count(*) Over (Partition BY sal ORDER BY sal) cnt FROM emp e 
) 
WHERE cnt>=2;        

5. Question : Odd/Even rows : Write a query to display even/odd number rows from a table.

Answer:

We can achieve this by using the ROWNUM pseudo column.

select * from
   (select empno, ename, sal, rownum rn 
    from emp 
    order by empno
   )
where mod (rn, 2) <> 0
order by rn        

6. Question : More than 2 employees Question -Write a query to display the employee information, who have more than 2 employees under a manager

Answer: We can achieve this by using the COUNT analytical function.

select * from (SELECT e.*, count(mgr) over (partition by mgr) as cnt from emp e ) where cnt >= 2         

7. Question : Maximum salary without using functions Write a query to find the maximum salary from the EMP table without using functions.

Answer: We can achieve this by using the SELF joins.

select * from emp
where sal not in 
(
select A.sal from emp A, emp B where A.sal < B.sal 
) 
        

8. Question : Find the number of rows in a table without using COUNT function ? (Write a query to find the number of rows in a table without using COUNT function. )

Answer :

SELECT MAX(rn) FROM
(
SELECT ROW_NUMBER() OVER(ORDER BY empno DESC) as rn FROM emp
)        

9. Question : Find the LAST inserted record in a table (Write a query to find the LAST inserted record in a table.) Answer: If you want the last record inserted, you need to have a timestamp or sequence number assigned to each record as they are inserted and then you can use the below query...

select * from t 
where TIMESTAMP_COLUMN = (select max(timestamp_column) from T) 
and rownum = 1;         

10. Question : Select LAST n records from a table

Write a query to select the last N records from a table... Or Explain the below query...

select * from emp 
minus 
select * from emp where rownum <=(select count(*) - &n from emp);        

 We all know when you execute a SQL statement, sub query is the one which is executed first.

#1: So if you run statement "select count(*) - &n from emp" and think EMP table has 10 rows and N value is 3. The output for below SQL statement will be 10 - 3, ie 7 #2: Now oracle takes 7 and execute the SQL statement as "select * from emp where rownum <= 7", which will fetch the first 7 rows.

#3: Now oracle runs SQL statement "select * from emp", which willl fetch all the 10 rows

#4: Now oracle takes the 10 rows from step3 and subtracts the 7 rows from step2, which will result in last 3 rows.

11. Question : Past 5 years .

Write a query to find the employees who are working in the company for the past 5 years.

Answer:  We can achieve this using the ADD_MONTHS function (available in spark data frame functions as well). 

select * from emp 
where hiredate < add_months(sysdate,-60);        

 12.Question: What is the difference between in and exists clause for the below query. which will have better performance ? what is the equivalent in the spark sql ?

SELECT *
FROM Customers
WHERE EXISTS (
    SELECT *
    FROM Orders
    WHERE Orders.CustomerID = Customers.ID
)


--  can be rewritten to:

SELECT *
FROM Customers
WHERE ID IN (
    SELECT CustomerID
    FROM Orders
)

-- or with a join:

SELECT Customers.*
FROM Customers
    INNER JOIN Orders ON Customers.ID = Orders.CustomerID

        

Answer : There are several arguments on this generally

  • EXISTS is much faster than IN when the subquery results is very large.
  • IN is faster than EXISTS when the subquery results is very small
  • Note : same thing is achieved with a join like query showed above.

13. Question : What is the difference between where and having clause of sql ?

Answer : see here for full analysis above from https://www.geeksforgeeks.org credits to them for below picture.

https://www.geeksforgeeks.org

14 Question . What is the difference between union and union all ? what is there in spark ?

Answer : Main difference between UNION and UNION ALL is that UNION removes duplicate records, but UNION ALL does not remove duplicate records.

E.g. Consider two tables A and B

A   B
10  15
20  20

UNION of A and B = 10, 20, 15

UNION ALL of A and B = 10, 20, 15, 20        

Performance of UNION ALL is considered better than UNION, since UNION ALL does not require additional work of removing duplicates.

With respect to Spark :

Dataframe union() – union() method of the DataFrame is used to combine two DataFrame’s of the same structure/schema. If schemas are not the same it returns an error.

DataFrame unionAll() – unionAll() is deprecated since Spark “2.0.0” version and replaced with union().

15. What is the difference between group by and distinct?

  • Group By operator is meant for aggregating/grouping rows where as distinct is just used to get distinct values..
  • Distinct is used to find unique/distinct records where as a group by is used to group a selected set of rows into summary rows by one or more columns or an expression. The functional difference is thus obvious.
  • The two queries return same result. The group by gives the same result as of distinct when no aggregate function is present.
  • Essentially, DISTINCT collects all of the rows, including any expressions that need to be evaluated, and then tosses out duplicates. GROUP BY can (again, in some cases) filter out the duplicate rows before performing any of that work.

16. Query to find records in Table A not in Table B without using NOT IN.?

Example : take Employee and Department tables

select d.deptname, nvl(max(e.salary),0 ) 
from Department d 
Leftouter join 
Employee e on e.deptid = d.id 
group by departmentname
        

If you don't want matching columns and only left table columns then we have to use left anti join instead of above left outer join.

Left Anti join (like not in clause) : An anti join returns values from the left side of the table that has no match with the right. It is also referred to as a left anti join.

Left Semi join (like in clause): A semi join returns values from the left side of the relation that has a match with the right. It is also referred to as a left semi join

based on the question answer varies...

17 Q.   What is the difference between DELETE and TRUNCATE in SQL?

Main differences between DELETE and TRUNCATE commands are:

  •    DML vs. DDL: DELETE is a Data Manipulation Language (DML) command. TRUNCATE is a Data Definition Language (DDL) command.
  • Number of Rows: We can use DELETE command to remove one or more rows from a table. TRUNCATE command will remove all the rows from a table.
  • WHERE clause: DELETE command provides support for WHERE clause that can be used to filter the data that we want to delete. TRUNCATE command can only delete all the rows. There is no WHERE clause in TRUNCATE command.
  • Commit: After DELETE command we have to issue COMMIT or ROLLBACK command to confirm our changes. After TRUNCATE command there is no need to run COMMIT. Changes done by TRUNCATE command can not be rolled back.

18 Q.   What is Merge statement in SQL(general syntax almost all modern databases supports merge statement)?

Merge statement is a combination of INSERT and UPDATE statements. If data is already present in a table, it can update the existing data. If data is not present in a table, then it can insert the data.

simply its implementation of Slowly changing dimension type1 as shown in above link.

Merge is a DML statement. So we may need to run commit or rollback command after this.

Sample syntax for MERGE is:

MERGE
INTO target_table tg_table
USING source_table src_table
ON ( src_table.id = tg_table.id )
WHEN MATCHED
THEN
    UPDATE
    SET  tg_table.name = src_table.name
WHEN NOT MATCHED
THEN
      INSERT ( tg_table.id, tg_table.name )
      VALUES ( src_table.id, src_table.name );        

19 Q.   Write a query for this problem?

Given a table Employee in which we have DeptId for each employee. Write a single SQL query to move the employees from DeptID 1 to 2 and move employees from DeptId 2 to 1.

Employee

Id Name DeptId
1  John          1 
2  George        2           
3  Jane          1  
4  Smith         2          

Answer: We can use CASE statement here.

UPDATE Employee SET DeptId = 
CASE DeptId
               WHEN ‘1’ THEN ‘2’
               WHEN ‘2’ THEN ‘1’

ELSE DeptId END;        

20 Q.   Write SQL Query to find duplicate rows in a database?

Answer: To find duplicate rows, we have to ask the interviewer what is the criteria for considering two rows duplicate of each other.

Let say in a given table Test_table if column_1 and column_2 of two rows are same, then these rows are considered equal.

We can use GROUP BY clause to group the rows with columns that are used for checking equality. Any group that have more than 1 rows will have duplicate rows.

Query to find duplicate will be as follows:

SELECT column_1, coulmn_2, count(*)
FROM Test_table
GROUP BY column_1, coulmn_2
HAVING count(*) > 1        

Another way is using SQL window function on the column name.

Example (used hive and databricks) :

 %sql 
drop table stuff;
CREATE TABLE stuff(
  id INTEGER NOT NULL,
  name VARCHAR(60) NOT NULL,
  city VARCHAR(60) NOT NULL
);
show tables in default;
INSERT INTO stuff
VALUES
  (904834, 'jim', 'London');
INSERT INTO stuff
VALUES (904835, 'jim', 'London');
INSERT INTO stuff
VALUES
  (90145, 'Fred', 'Paris');
INSERT INTO stuff
VALUES
  (90132, 'Fred', 'Paris');
INSERT INTO  stuff
VALUES
  (90133, 'Fred', 'Paris');
INSERT INTO stuff
VALUES
  (923457, 'Barney', 'New York');

select * from stuff;        

Answer with windows function is :

SELECT
  t.*
FROM
  (
    SELECT
      s.*,
      COUNT(*) OVER (PARTITION BY s.name, s.city) AS qty
    FROM
      stuff s
  ) t
WHERE
  t.qty > 1
ORDER BY
  t.name,
  t.city        

Result :

id	   name	    city   qty	
90132	Fred	Paris	3	
90145	Fred	Paris	3	
90133	Fred	Paris	3	
904835	jim	London	2	
904834	jim	London	2	        

here name and city are duplicate columns

21 Q.   What is the main difference between RANK and DENSE_RANK functions ?

Both RANK and DENSE_RANK functions are used to get the ranking of an ordered partition.

Main difference between RANK and DENSE_RANK functions is in the handling of case when a tie happens while ranking the data.

In a tie, RANK function skips the next ranking(s) and assigns same rank to values that tie. So there will be gaps in the rank.

In a tie, DENSE_RANK function does not skip the ranks. It assigns same rank to values that tie. But next rank will be consecutive rank.

Example 1: In set (10, 10, 20, 30, 30, 40), RANK returns (1,1,3,4,4,6) DENSE_RANK returns (1,1,2,3,3,4)

Example 2 :

WITH T(Id, OrderId, ProductId)
     AS (SELECT 1,1,1 UNION ALL
         SELECT 2,1,1 UNION ALL
         SELECT 3,1,1 UNION ALL
         SELECT 4,1,2)


SELECT *,
	   ROW_NUMBER() OVER(PARTITION BY OrderId, ProductId ORDER BY Id DESC) AS 'ROW_NUMBER',
	   RANK() OVER(PARTITION BY OrderId, ProductId ORDER BY Id DESC) AS 'RANK',
	   DENSE_RANK() OVER(PARTITION BY OrderId, ProductId ORDER BY Id DESC) AS 'DENSE_RANK'
 FROM T        

Result :

No alt text provided for this image

22 Q.   What is the use of WITH clause in SQL?

In SQL, WITH clause is used to create a Subquery or View for a set of data.

The main uses of WITH clause are:

  • Simplify: It can simplify a SQL query by creating a subset of data.
  • Reduce Repetition: WITH clause can create a subset of data that can be reused multiple times in the main query.

Example : In following query we use WITH clause to get the set of employee in Finance department. Then we use this subset fin_employee to filter based on AGE less than 30 and Female Gender.

We have used the same set fin_employee multiple times in main query.

WITH fin_employee AS
(SELECT * FROM Employee WHERE dep_name = ‘Finance’)

SELECT * FROM fin_employee WHERE AGE < 30
UNION ALL
SELECT * FROM fin_employee

WHERE Gender = ‘Female’;        

Write SQL Query to get Student Name and number of Students in same grade.

23 Q.   Write SQL Query to get Student Name and number of Students in same grade.

Given Student Table

ID| Name | Grade
1| George | 1
2| Smith | 2        

Answer: We can use WITH clause for this problem. We first get the number students in each grade by using GROUP BY on grade. Then we use Sub-Query returned by WITH clause in Main query.

Query will be as follows:

WITH grade_count AS (
 SELECT grade, COUNT(*) AS grade_count 
 FROM  student
 GROUP BY grade)

SELECT s.name AS student_name,
gc.grade_count AS grade_count
FROM  student s,
grade_count gc

WHERE e.grade = gc.grade;          

24 Q.   Write a Query to get Unique names of products without using DISTINCT keyword.

We can use GROUP BY for this purpose. It can print the distinct groups of PRODUCT NAME.

SELECT prod_name FROM product 
GROUP BY prod_name        

25 Q.   What is the difference between Left OUTER Join and Right OUTER Join?

Let say we have two tables X and Y.

The result of an LEFT OUTER JOIN of X and Y is all rows of X and common rows between X and Y.

The result of an RIGHT OUTER JOIN of X and Y is all rows of Y and common rows between X and Y.

Example :

Consider following two tables, with just one column x and y:

x   | y
- - -|- -
10 | 30
20 | 40
30 | 50

40 | 60        

In above tables (10,20) are unique to table X, (30,40) are common, and (50,60) are unique to table Y.

LEFT OUTER JOIN :A left OUTER JOIN by using following query will give us all rows in X and common rows in X and Y.

select * from X LEFT OUTER JOIN Y on X.x = Y.y;

x   | y
-- -+-----
10 | null
20 | null
30 |   30
40 |   40        

RIGHT OUTER JOIN :A right OUTER JOIN by using following query will give all rows in Y and common rows in X and Y.

select * from X RIGHT OUTER JOIN Y on X.x = Y.y;

x     | y
----- +----
30   | 30
40   | 40
null | 50
null | 60        

26 Q : What is the difference betweeb Rank, Dense_rank, row_number ?

Advance Hive functions. They come under hive analytical functions.

  1. In rank() function equal ranks are given same rank value 
  2. In dense_rank() function equal ranks are given same rank value but there will be no gaps as are in rank() function
  3. row_number() function will not give equal rank value for same ranks .

27 Q: Difference between sort by an order by , Distribute by and Cluster Byin hive?

  • ORDER BY performs a total ordering of the query result set. This means that all the data is passed through a single reducer, which may take an unacceptably long time to execute for larger data sets.
  • SORT BY orders the data only within each reducer, thereby performing a local ordering, where each reducer’s output will be sorted. You will not achieve a total ordering on the dataset. Better performance is traded for total ordering.
  • DISTRIBUTE BY x: ensures each of N reducers gets non-overlapping ranges of x, but doesn't sort the output of each reducer. You end up with N or more unsorted files with non-overlapping ranges.
  • CLUSTER BY x: ensures each of N reducers gets non-overlapping ranges, then sorts by those ranges at the reducers. This gives you global ordering, and is the same as doing (DISTRIBUTE BY x and SORT BY x). You end up with N or more sorted files with non-overlapping ranges.

Note : 1) Cluster By is a short-cut for both Distribute By and Sort By.

2) Above concepts are same in spark-sql as well . see here for mor information

https://spark.apache.org/docs/latest/sql-ref-syntax-qry.html

28 Q: We have a sales table in a company and it has sales entries from salesman around the globe. How do you rank each salesperson by country based on their sales volume in Hive?

Hive support several analytic functions and one of the functions is RANK() and it is designed to do this operation.

Lookup details on other window and analytic functions — https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics

Example :

Hive> SELECT rep_name, rep_country, sales_volume, rank() over (PARTITION qBY rep_country ORDER BY sales_volume DESC) as rank FROM salesrep;        

29 Q : What is the difference between rollup and cube OLAP functions ? What is their significance ?

See my detailed answer here.

ROLLUP and CUBE are simple extensions to the SELECT statement's GROUP BY clause. ROLLUP creates subtotals at any level of aggregation needed, from the most detailed up to a grand total. CUBE is an extension similar to ROLLUP , enabling a single statement to calculate all possible combinations of subtotals.

Generally CUBE and ROLLUP are used for reporting purposes and they do the Subtotal and Grand total. 

CUBE generates a result set that shows aggregates for all combinations of values in the selected columns and 

ROLLUP generates a result set that shows aggregates for a hierarchy of values in the selected columns.

One Important difference in general is 

 1. The N elements of a ROLLUP specification correspond to N+1 GROUPING SETS.

 2. The N elements of a CUBE specification correspond to 2^N GROUPING SETS.

For example : 

store_id,product_type rollup is equivalent to 

GROUP BY store_id,product_type
GROUPING SETS (
(store_id,product_type)
,(product_type)
, ())        

for 2 (n) group by columns grouping set has (n+1 ) = 3 combinations of columns  

Cube is equivalent to

GROUP BY store_id,product_type
GROUPING SETS (
(store_id,product_type)
,(store_id)
,(product_type)
, ()) 

-- () means ... It defines an empty grouping set (). i.e. with only one column in the select which is aggregate function. like for example  select null, null, sum(*) from table xxx doesnt need group by clause since there is only one column aggregate function used.        

for 2 (n) group by columns grouping set has (2^n ) = 4 combinations of columns 

Same was described here in my answer .

30Q) Write a pyspark query for a report that provides the customer ids from the Customer table that bought all the products in the Product table. Return the result table in any order. The query result format is in the following example: Customer table:

Customer table:
+-------------+-------------+
| customer_id | product_key |
+-------------+-------------+
| 1           | 5           |
| 2           | 6           |
| 3           | 5           |
| 3           | 6           |
| 1           | 6           |
+-------------+-------------+

Product table:
+-------------+
| product_key |
+-------------+
| 5           |
| 6           |
+-------------+

Result table:
+-------------+
| customer_id |
+-------------+
| 1           |
| 3           |
+-------------+
The customers who bought all the products (5 and 6) are customers with id 1 and 3.        

Answer :

from pyspark.sql.functions import countDistinct

# Count the total number of distinct products in the 'Product' table
total_products_count = product_df.distinct().count()

# Count the number of products bought by each customer
customer_product_count = customer_df
.groupBy("customer_id")
.agg(countDistinct("product_key")
.alias("bought_products"))

# Filter customers who bought all products
result_df = customer_product_count.filter(customer_product_count.bought_products == total_products_count).select("customer_id")

# Show the result
result_df.show()
        

31 Q) Customer product transactions interview questions (These questions delve into more complex scenarios, testing your ability to work with dates, calculate averages, and perform more in-depth customer analysis using PySpark.)

Customer Table
+-------------+-------------+
| customer_id | product_key |
+-------------+-------------+
| 1           | 5           |
| 2           | 6           |
| 3           | 5           |
| 3           | 6           |
| 1           | 6           |
+-------------+-------------+
Product table:
+-------------+
| product_key |
+-------------+
| 5           |
| 6           |
+-------------+        

Count of Customers by Product

from pyspark.sql.functions import count

# Count the number of customers for each product
result_df = customer_df.groupBy("product_key").agg(count("customer_id").alias("customer_count"))

# Show the result
result_df.show()        

Total Sales Amount by Customer

from pyspark.sql.functions import sum

# Join customer and product tables to get sales amount
sales_df = customer_df.join(product_df, "product_key").groupBy("customer_id").agg(sum("sales_amount")
.alias("total_sales"))

# Show the result
sales_df.show()
        

Products Bought by Each Customer

# Get a list of products bought by each customer
result_df = customer_df.groupBy("customer_id").agg(collect_list("product_key").alias("products_bought"))

# Show the result
result_df.show()        

Top N Customers by Total Sales Amount

from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Calculate total sales amount for each customer
sales_df = customer_df.join(product_df, "product_key").groupBy("customer_id").agg(sum("sales_amount").alias("total_sales"))

# Rank customers by total sales amount
window_spec = Window.orderBy(desc("total_sales"))
ranked_df = sales_df.withColumn("rank", rank().over(window_spec))

# Get top N customers
top_customers_df = ranked_df.filter(col("rank") <= N)

# Show the result
top_customers_df.show()

Result : 

+-------------+-------------+----+
| customer_id | total_sales |rank|
+-------------+-------------+----+
| 1           | 250         | 1  |
| 3           | 180         | 2  |
| 2           | 120         | 3  |
+-------------+-------------+----+
        

Customers Who Bought All Products

from pyspark.sql.functions import countDistinct

# Count the total number of distinct products
total_products_count = product_df.distinct().count()

# Count the number of distinct products bought by each customer
customer_product_count = customer_df.groupBy("customer_id").agg(countDistinct("product_key").alias("bought_products"))

# Filter customers who bought all products
result_df = customer_product_count.filter(customer_product_count.bought_products == total_products_count).select("customer_id")

# Show the result
result_df.show()

Result : 
+-------------+
| customer_id |
+-------------+
| 1           |
| 3           |
+-------------+        


Order and Order details tables

Order table :
+---------+----------+
| order_id| customer_id |
+---------+----------+
| 1       | 1          |
| 2       | 2          |
| 3       | 1          |
+---------+----------+
OrderDtails Table : 
+---------+------------+--------+
| order_id| product_key| amount |
+---------+------------+--------+
| 1       | 5          | 100    |
| 1       | 6          | 150    |
| 2       | 5          | 120    |
| 3       | 6          | 200    |
+---------+------------+--------+
        

Calculate the average order value for each customer.

from pyspark.sql.functions import avg

# Join orders and order details tables
joined_df = orders_df.join(order_details_df, "order_id")

# Calculate total order value for each order
order_value_df = joined_df.groupBy("order_id", "customer_id").agg(sum("amount").alias("order_value"))

# Calculate average order value for each customer
avg_order_value_df = order_value_df.groupBy("customer_id").agg(avg("order_value").alias("average_order_value"))

# Show the result
avg_order_value_df.show()
        

Expected Result:


+----------+-------------------+
| customer_id | average_order_value |
+----------+-------------------+
| 1        | 150.0             |
| 2        | 120.0             |
+----------+-------------------+
        

Customers and order details tables :

+-------------+-------------+
| customer_id | name        |
+-------------+-------------+
| 1           | Alice       |
| 2           | Bob         |
| 3           | Charlie     |
+-------------+-------------+
OrderDtails Table : 
+---------+------------+--------+
| order_id| product_key| amount |
+---------+------------+--------+
| 1       | 5          | 100    |
| 1       | 6          | 150    |
| 2       | 5          | 120    |
| 3       | 6          | 200    |
+---------+------------+--------+        

Find customers who spent more than 300 in total.

# Join customers and order details tables
joined_df = customers_df.join(order_details_df, "customer_id")

# Calculate total spending for each customer
total_spending_df = joined_df.groupBy("customer_id", "name").agg(sum("amount").alias("total_spending"))

# Filter customers who spent more than 300
result_df = total_spending_df.filter(total_spending_df.total_spending > 300)

# Show the result
result_df.show()        

Result :

+-------------+-------+--------------+
| customer_id | name  | total_spending|
+-------------+-------+--------------+
| 1           | Alice | 350          |
| 3           | Charlie| 400          |
+-------------+-------+--------------+        

Sales Table:

+------------+------------+-------------+
| transaction_id | sale_date | sales_amount |
+------------+------------+-------------+
| 1          | 2023-01-01 | 200         |
| 2          | 2023-01-15 | 150         |
| 3          | 2023-02-05 | 180         |
| 4          | 2023-02-10 | 220         |
+------------+------------+-------------+
        

Calculate the average sales amount for each month.

from pyspark.sql.functions import year, month

# Extract year and month from the sale_date
sales_df = sales_df.withColumn("year", year("sale_date"))
sales_df = sales_df.withColumn("month", month("sale_date"))

# Calculate average sales amount for each month
avg_sales_df = sales_df.groupBy("year", "month").agg(avg("sales_amount").alias("average_sales"))

# Show the result
avg_sales_df.show()        

Result :

+----+-----+--------------+
|year|month|average_sales |
+----+-----+--------------+
|2023| 1   |175.0         |
|2023| 2   |200.0         |
+----+-----+--------------+        

Calculate Customer Lifetime Value (CLV) considering a 5-year relationship

Transactions Table:
+------------+-------------+-------------+
| transaction_id | customer_id | sales_amount |
+------------+-------------+-------------+
| 1          | 1           | 200         |
| 2          | 2           | 150         |
| 3          | 1           | 180         |
| 4          | 3           | 220         |
+------------+-------------+-------------+        

Calculate the Customer Lifetime Value (CLV) for each customer. CLV is the total revenue a company expects to earn from a customer during their entire relationship.

# Calculate total sales amount for each customer
customer_clv_df = transactions_df.groupBy("customer_id").agg(sum("sales_amount").alias("total_sales"))

# Calculate average transaction value for each customer
customer_clv_df = customer_clv_df.withColumn("num_transactions", count("transaction_id"))
customer_clv_df = customer_clv_df.withColumn("average_transaction_value", col("total_sales") / col("num_transactions"))

# Calculate CLV (considering a 5-year relationship)
customer_clv_df = customer_clv_df.withColumn("clv", col("average_transaction_value") * 12 * 5)

# Show the result
customer_clv_df.show()        

Expected Result :

+-------------+-------------+---------------------------+----------------------+
| customer_id | total_sales | num_transactions          | clv                  |
+-------------+-------------+---------------------------+----------------------+
| 1           | 380         | 2                         | 4560.0               |
| 2           | 150         | 1                         | 900.0                |
| 3           | 220         | 1                         | 1320.0               |
+-------------+-------------+---------------------------+----------------------+
        

Calculate Retention Rate = active_customers/ total_customers * 100

Customers table : 
+-------------+-------------+
| customer_id | signup_date |
+-------------+-------------+
| 1           | 2022-01-01  |
| 2           | 2022-02-01  |
| 3           | 2022-03-01  |
+-------------+-------------+
Transactions Table (same as before example)

from pyspark.sql.functions import datediff

# Calculate the number of active customers for each month
active_customers_df = transactions_df.join(customers_df, "customer_id")
active_customers_df = active_customers_df.withColumn("month_diff", datediff("sale_date", "signup_date") / 30)
active_customers_df = active_customers_df.filter(active_customers_df.month_diff <= 1)
retention_rate_df = active_customers_df.groupBy("year", "month").agg(countDistinct("customer_id").alias("active_customers"))

# Calculate the total number of customers for each month
total_customers_df = customers_df.withColumn("year", year(current_date()))
total_customers_df = total_customers_df.withColumn("month", month(current_date()))
total_customers_df = total_customers_df.groupBy("year", "month").agg(countDistinct("customer_id").alias("total_customers"))

# Calculate retention rate
retention_rate_df = retention_rate_df.join(total_customers_df, ["year", "month"])
retention_rate_df = retention_rate_df.withColumn("retention_rate", col("active_customers") / col("total_customers") * 100)

# Show the result
retention_rate_df.show()

        

Result :

+----+-----+----------------+----------------+-----------------+
|year|month|active_customers|total_customers |retention_rate   |
+----+-----+----------------+----------------+-----------------+
|2023|1    |3               |3               |100.0            |
|2023|2    |2               |3               |66.66666666666667|
+----+-----+----------------+----------------+-----------------+
        


Will keep on adding ... Happy learning..


No alt text provided for this image


To view or add a comment, sign in

More articles by Ram Ghadiyaram

Others also viewed

Explore content categories