自动提交Spark-SQL执行

本文遵循BY-SA版权协议,转载请附上原文出处链接。


本文作者: 黑伴白

本文链接: http://heibanbai.com.cn/posts/6d4753fc/

自动提交Spark-SQL执行

说明

核心主程序为python脚本, 然后shell脚本中通过spark-submit命令提交主python脚本作业到集群中执行sql文件中的各条sql语句:

  • python脚本: 主程序, 按照分号拆分SQL文件中的SQL语句, 并执行
  • shell脚本: 执行入口, 通过spark-submit提交python到集群执行, 最后获取yarn日志

程序存放结构:

  • exec_spark_sql

    • bin

      • run_spark_sql.py(执行主程序)
    • conf

      • principal.conf(租户/认证文件/队列信息)

        1
        2
        3
        principal=test
        keyTab=/data/key/user.keytab
        queue=q_user
    • sql

      • test.sql(要执行的sql文件, 每条语句分号”;”分隔)
    • exec_spark_sql.sh(执行入口程序)

shell脚本

脚本名称: exec_spark_sql.sh

脚本参数: SQL文件绝对路径(例: /tmp/test.sql)

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/bin/bash

# 参数 SQL文件绝对路径
sqlFile=$1
# 提交集群后显示的作业名 "脚本名称_SQL文件名称"
jobName=`basename $0`_`basename $1`
# 当前日期
currDate=`date +"%Y%m%d"`
# 脚本所在路径
shellDir=$(cd "$(dirname $0)"; pwd)
# 配置文件
confFile=${shellDir}/conf/principal.conf

# 检查配置文件
if [ ! -f ${confFile} ];then
echo "配置文件${confFile}不存在!"
exit 1
fi
# 检查SQL文件
if [ ! -f ${sqlFile} ];then
echo "SQL文件${sqlFile}不存在!"
exit 1
fi

# 租户
principal=$(cat ${confFile} |grep -i principal |awk -F'=' '{print $2}')
# 认证文件
keyTab=$(cat ${confFile} |grep -i keyTab |awk -F'=' '{print $2}')
# 队列
queue=$(cat ${confFile} |grep -i queue |awk -F'=' '{print $2}')

# 日志目录
logDir="/tmp/log/exec_spark_sql/${currDate}"
# 日志文件
logFile="${logDir}/${jobName}_`date +"%Y%m%d%H%M%S"`.log"

# 检查目录
CheckDir()
{
if [ -d $1 ];then
echo "目录$1存在."
else
echo "目录$1不存在, 开始创建:mkdir -p -m 775 $1"
mkdir -p -m 775 $1
if [ $? -ne 0 ];then
echo "目录创建失败!"
exit 1
else
echo "目录创建成功."
fi
fi
}

# 记录日志
Log()
{
createTime=`date +"%Y-%m-%d %H:%M:%S"`
echo -e "[${createTime}] $*" |tee -a ${logFile}
}

# 检查日志目录是否存在
CheckDir "${logDir}"

# 程序执行开始时间
shell_start_date=$(date +%s)
Log "Shell_name:$0"
Log "Shell_para:$@"
# 认证
kinit -kt ${keyTab} ${principal}

# spark-submit 队列参数
exec_cmd="spark-submit --master yarn --deploy-mode cluster \
--driver-memory 4G --executor-memory 8G --excutor-cores 4 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=214783647s \
--conf spark.dynamicAllocation.minExecutors=3 \
--conf spark.dynamicAllocation.maxExecutors=500 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.kryoserializer.buffer.max=128MB \
--conf spark.executor.memoryOverhead=4096M \
--conf spark.yarn.maxAppAttempts=1 \
--conf yarn.resourcemanager.am.max-attempts=1 \
--principal ${principal} \
--keytab ${keyTab} \
--name ${jobName} \
--files ${sqlFile} \
${shellDir}/bin/run_spark_sql.py ${jobName} ${sqlFile} ${currDate}"

Log "程序开始执行:${shellDir}/bin/run_spark_sql.py ${jobName} ${sqlFile} ${currDate}"
Log "提交命令如下:"
echo "${exec_cmd}"|tee -a ${logFile}
# Spark job执行开始时间
spark_start=$(date +%s)
res=$(${exec_cmd} 2>&1)
# 获取执行状态
resCode=$?
# Spark job执行结束时间
spark_end=$(date +%s)
# Spark job执行耗时
time=`expr ${spark_end} - ${spark_start}`
Log "Spark job执行完成, 耗时:${time}秒"
# 判断执行是否成功
if [ ${resCode} -ne 0 ];then
Log "Spark job执行失败!"
else
Log "Spark job执行成功."
fi

# 获取日志信息 获取appid
appid=$(echo "${res}" |grep -i 'proxy/application' |grep -o -E "application_[0-9]+_[0-9]+" |head -n 1)
Log "appid:${appid}"
if [ "X${appid}" == "X" ];then
Log "applicationId获取失败, 执行信息如下:"
Log "res:${res}"
else
Log "Spark job的applicationId是:${appid}"
# 解析yarn日志 获取日志内容
Log "==============================Yarn日志=============================="
printFlag=0
yarn logs --applicationId ${appid} | while read line
do
if [[ ${line} =~ ^'LogType:stdout.ext'$ ]];then
printFlag=1
fi
if [[ ${line} =~ ^'End of LogType:stdout.ext'$ ]];then
printFlag=0
fi
if [[ ${printFlag} == 1 ]];then
echo "${line}"|tee -a ${logFile}
fi
done
Log "End of LogType:stdout.ext"
fi

# 程序执行结束时间
shell_end_date=$(date +%s)
# 执行耗时
shell_time=`expr ${shell_start_time} - ${shell_end_time}`

Log "Spark job执行耗时:${time}秒"
Log "程序运行结束, 返回状态为${resCode}, 总运行耗时:${shell_time}秒"
exit ${resCode}

python脚本

脚本名称: run_spark_sql.py

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# -*- coding:utf-8 -*-

from pyspark.sql import SparkSession
import sys,os
import time
import re

def main(job_name,sqlfile,job_date):
# 获取sql文件名
sql_file_name = os.path.split(sqlfile)[1]

# 创建SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

# 读取sql文件
print("sql_file_name:"+sql_file_name)
print("SPARK_YARN_STAGING_DIR:"+os.getenv('SPARK_YARN_STAGING_DIR'))
sql_content = '\n'.join(sc.textFile(os.getenv('SPARK_YARN_STAGING_DIR')+"/"+sql_file_name).collect())
# 使用正则拆分SQL语句
sql_statements = re.split(r';\s*\n', sql_content)

# 执行每个SQL 并打印执行时间
sql_start_time = time.time()
for index, sql in enumerate(sql_statements):
sql = sql.strip()
if not sql:
continue
start_time = time.time()
print(f"开始执行SQL {index + 1}: {sql}")
spark.sql(sql).show()
end_time = time.time()
print(f"SQL {index + 1} 耗时: {end_time - start_time:.2f} 秒")
sql_end_time = time.time()
print(f"SQL ALL 耗时: {sql_end_time - sql_start_time:.2f} 秒")
spark.stop()

if __name__ == "__main__":
print("Exec run_spark_sql.py")
# 解析参数
if len(sys.argv) < 4 :
print("传入参数错误, 用法: spark-submit run_spark_sql.py job_name sqlfile job_date")
sys.exit(-1)
job_name = sys.argv[1]
sqlfile = sys.argv[2]
job_date = sys.argv[3]
print("程序接收到的参数为: job_name[" + job_name + "] sqlfile[" + sqlfile + "] job_date[" + job_date + "]")
main(job_name, sqlfile, job_date)

蚂蚁🐜再小也是肉🥩!


自动提交Spark-SQL执行
http://heibanbai.com.cn/posts/6d4753fc/
作者
黑伴白
发布于
2024年8月6日
许可协议

“您的支持,我的动力!觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”

微信二维码

微信支付

支付宝二维码

支付宝支付