只有 E-MapReduce 产品版本 V1.3.0(包括)以上支持 Sqoop 作业类型。在低版本集群上运行 Sqoop 作业会失败,errlog 会报不支持的错误。参数细节请参见
- 单击该页右上角的创建作业,进入创建作业页面。
- 选择 Sqoop 作业类型,表示创建的作业是一个 Sqoop 作业。Sqoop 作业在 E-MapReduce 后台使用以下的方式提交:
- 在应用参数选项框中填入 Sqoop 命令后续的参数。
- 单击确定,Sqoop 作业即定义完成。
只有 E-MapReduce 产品版本 V1.3.0(包括)以上支持 Sqoop 作业类型。在低版本集群上运行 Sqoop 作业会失败,errlog 会报不支持的错误。参数细节请参见
等)来分析和处理自己的数据。用户还可以通过E-MapReduce将数据非常方便的导入和导出到阿里云其他的云数据存储系统和数据库系统中,如阿里云 OSS、阿里云 RDS 等。
大数据计算服务(MaxCompute,原名ODPS)是一种快速、完全托管的TB/PB级数据仓库解决方案。MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。
数据工场DataWorks (原大数据开发套件Data IDE) 是基于MaxCompute作为计算和存储引擎的用于工作流可视化开发和托管调度运维的海量数据离线加工分析平台,支持按照时间和依赖关系的任务全面托管调度,支持每日千万级别的任务按照DAG关系准确、准时运行,提供可视化的任务监控管理工具,支持以 DAG 图的形式展示任务运行时的全局情况等
在文档使用中是否遇到以下问题
感谢您的打分,是否有意见建议想告诉我们?
感谢您的反馈,反馈我们已经收到
Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个项目。基本上所有的企业重要的数据都存储在关系型数据库中。
,Postgres等)中的数据通过MapReduce作业一行行抽取然后导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
导入之前,sqoop使用jdbc检查导入的表检索出列字段和字段类型,数据库类型会通过java类来映射到java数据类型,在mapreduce应用中通过对应的java类型来保存该字段的值(sqoop代码生产器使用这些信息来创建对应表的类,用于保存从表中抽象的记录)。sqoop启动的mapreduce作业用到一个InputFormat,他可以通过jdbc从一个数据库表中读取部分内容,hadoop提供的DataDrivernDBInputFormat能够为几个map任务对查询结果进行划分(通过划分列--splitting column来进行划分,根据表中元数据,sqoop选择合适的列(这个列通常是主键)进行划分,该列的最大值、最小值会被读到,然后根据map进行分组,确定每组的需要查询的数量),在生产反序列化代码和配置inputFormat后,sqoop将作业发送到mapreduce集群,map任务执行查询并且将resultset中的数据反序列化到生成类的实例,这些数据要么被直接被保存在sequenceFile文件中,要么写在HDFS之前被转换成分割的文本。为了解决大对象问题,sqoop将导入的大对象数据存储在LobFile(LobFile格式文件使用了64位的地址空间,故能够存储一个非常大的单条记录)格式的单独文件中。LobFile文件允许客户端持有对该记录的引用,而不访问记录内容,对记录的访问时通过InputStream、Reader来实现的。在一个map任务中,BlobRef和clobRef类会缓存对底层LobFile文件的引用,然后通过blobRef.getDateStream()来获取数据。
在执行导出之前,sqoop会根据数据库连接字符来选择一个导出的方法(大多会选择jdbc,会生成一批insert语句,每条语句会向目标表中插入多条记录,而不是仅仅一条),sqoop会根据目标表的定义生成一个java类(它可以从生成的文本中解析出几录,向目标表中插入合适类型的值),然后会启动一个mapreduct作业,从hdfs中读取源数据文件,使用生成的类解析出记录,并且执行选定的方法。
对于mysql数据库来说可以使用mysqlimport的直接模式方法。每个map任务会生成一个mysqlimport进程, 该进程通过本地文件系统上一个命名FIFO通道进行通信,数据通过这个通道流入mysqlimport,然后写入数据库
1、默认的并行机制要小心:默认情况下的并行意味着Sqoop假设大数据是在分区范围内均匀分布的。这在当你的源系统是使用一个序列号发生器来生成主键的时候工作得很好。打个比方,当你有一个10个节点的集群,那么工作负载是在这10台上平均分配的。但是,如果你的分割键是基于字母数字的,拥有比如以“A”作为开头的键值的数量会是“M”作为开头键值数量的20倍,那么工作负载就会变成从一台服务器倾斜到另一台服务器上。
2、导入过程中默认分割符为“,”,如果字段内容中有逗号,那么需要指定分割符。
3、可以指定文件格式,默认为文本格式并不能保存数据库中类型为varbinary的列,所以需要指定文件格式为二进制文件(对应sequenceFile格式--仅仅支持java语言【序列化与反序列化时候会使用到自动生成的java类】和Avro格式--可以被多种语音处理如pyson),有时文本格式在区分null值和字符串null时可以出现问题(文本格式可以用--null-string '\\N'
4、可以使用where子句来限定导入的记录,而不需到导入全量数据,每天的增量数据可以增加日期限定。
5、需要保证在执行导入HDFS数据时,源表不能进行修改,否则会出现数据不一致。
6、在使用直接模式(用--direct)(优点是快速、大吞吐率,但是没有jdbc模式通用)导入数据时,需要针对每个数据库进行具体处理,如源数据库是mysql,mysql的mysqldump并不支持大对象数据--如clob、blob,需要jdbc专用的api对这些列单独处理
1、使用--hive -import参数可以从源数据库中直接将数据载入hive
指定JDBC连接字符串 |
指定要使用的连接管理器类 |
指定要使用的JDBC驱动类 |
设置用于存放认证的密码信息文件的路径 |
从控制台读取输入的密码 |
可选,指定存储数据库连接参数的属性文件 |
将数据追加到HDFS上一个已存在的数据集上 |
将数据导入到Avro数据文件 |
将数据导入到普通文本文件(默认) |
边界查询,用于创建分片(InputSplit) |
从表中导出指定的一组列的数据 |
如果指定目录存在,则先删除掉 |
使用直接导入模式(优化导入速度) |
分割输入stream的字节大小(在直接导入模式下) |
从数据库中批量读取记录数 |
设置内联的LOB对象的大小 |
使用n个map任务并行导入数据 |
指定按照哪个列去分割数据 |
导入HDFS的目标路径 |
HDFS存放表的根路径 |
指定导出时所使用的查询条件 |
果指定列为字符串类型,使用指定字符串替换值为null的该类列的值 |
如果指定列为非字符串类型,使用指定字符串替换值为null的该类列的值 |
启用数据副本验证功能,仅支持单表拷贝,可以指定验证使用的实现类 |
指定验证门限所使用的类 |
使用直接导出模式(优化速度) |
导出过程中HDFS源路径 |
使用n个map任务并行导出 |
导出数据调用的指定存储过程名 |
更新参考的列名称,多个列名使用逗号分隔 |
使用指定字符串,替换字符串类型值为null的列 |
使用指定字符串,替换非字符串类型值为null的列 |
在数据导出到数据库之前,数据临时存放的表名称 |
清除工作区中临时存放的数据 |
5、可以使用临时阶段表来存储,导出过程中的中间结果--clear staging -table
序:map客户端使用jdbc向数据库发送查询语句,将会拿到所有数据到map的客户端,安装jdbc的原理,数据全部缓存在内存中,但是内存没有出现爆掉情况,这是因为1.3以后,对jdbc进行了优化,改进jdbc内部原理,将数据写入磁盘存储了。
Sqoop是apache旗下一款“Hadoop和关系数据库服务器之间传送数据”的工具。Sqoop架构非常简单,其整合了Hive、Hbase和Oozie,通过map-reduce任务来传输数据,从而提供并发特性和容错。
导出数据:从Hadoop的文件系统中导出数据到关系数据库mysql等。
从RDBMS导入单个表到HDFS。表中的每一行被视为HDFS的记录。所有记录都存储为文本文件的文本数据(或者Avro、sequence文件等二进制数据)
如果成功执行,那么会得到下面的输出。
Sqoop的import工具会运行一个MapReduce作业,该作业会连接MySql数据库并读取表中的数据。默认情况下,该作业会并行使用4个map任务来加速导入过程,每个任务都会将其所导入的数据写到一个单独的文件,但所有4个文件都位于同一个目录中。这里我们只使用一个map(-m 1),这样我们只得到一个保存在hdfs中的文件。
查看HDFS导入的数据,intsmaze表的数据和字段之间用逗号(,)表示。
默认情况下,Sqoop会将我们导入的数据保存为逗号分隔的文本文件。如果导入数据的字段内容存在逗号分隔符,我们可以另外指定分隔符,字段包围字符和转义字符。使用命令行参数可以指定分隔符,文件格式,压缩等。支持文本文件(--as-textfile)、avro(--as-avrodatafile)、SequenceFiles(--as-sequencefile)。默认为文本。
使用一个简单的查询通常就可以读取一张表的内容
但是为了更好的导入性能,可以将查询划分到多个节点上执行。查询时根据一个划分列(确定根据哪一个列划分)来进行划分。根据表中的元数据,Sqoop会选择一个合适的列作为划分列(通常是表的主键)。主键列中的最小值和最大值会被读出,与目标任务数一起来确定每个map任务要执行的查询。当然用户也可以使用split-by参数自己指定一个列作为划分列。
注意:划分列的选择是影响并行执行效率的重要因素。如果id列的值不是均匀分布的(比如id值从2000到4000的范围是没有记录的),那么有一部分map任务可能只有很少或没有工作要做,而其他任务则有很多工作要做。
严重注意:在1.3之前,map的并行度一定要设置好,因为map客户端会向数据库发送查询语句,将会拿到所有数据到map的客户端缓存到,然后在执行map()方法一条一条处理,所有如果设置不好,一个map拿到的表数据过大就会内存溢出,毕竟里面是用jdbc去获取的,所有数据都装在jdbc的对象中,爆是必然的。在1.3以后改写jdbc的内部原理,拿到一条数据就写入硬盘中,就没有内存溢出了。
Sqoop不需要每次都导入整张表。例如,可以指定仅导入表的部分列。用户也可以在查询中加入where子句,来限定需要导入的记录。例如,如果上个月已经将id为0~9999的记录导入,而本月新增了1000条记录,那么在导入时的查询语句中加入子句where id>=10000,来实现只导入所有新增的记录。
下面的语法用于Sqoop导入命令增量选项。
假设新添加的数据转换成intsmaze表如下:
下面的命令用于在intsmaze表执行增量导入。
执行增量导入时,则会在hdfs上默认路径下新增一个文件来存储导入的新增数据,如上面的part-m-00001。
--m 2 则会报该路径下的目录已经存在错误,即无法执行成功。
在使用Sqoop导入表数据到HDFS,我们可以指定目标目录。
实际场景的分析:我一开始担心在导入增量数据时,数据文件的位置等问题,想过通过每次执行增量导入时来根据时间作为文件名来指定每一次导入时文件存储在hdfs上的路径来解决。现在看来不需要担心这个问题了。但是考虑这样一种情况:关系库中的某张表每天增量导入到hdfs上,然后使用hive对导入的数据加载进hive表时,我们不应该每次都情况hive表再进行全局导入hive,这样太耗费效率了。当然可以根据文件的生成时间来确定每次把那个文件导入到hive中,但是不便于维护,可以直接根据目录名来导入该目录下的数据到hive中,且导入到hive中的数据可以按天设置分区,每次导入的数据进入一个新的分区。
有些业务场景只需要对hive表中每天新增的那些数据进行etl即可,完全没有必要每次都是将整个hive表进行清理,那么可以结合hive的分区,按天进行分区,这样每次进行etl处理就处理那一个分区数据即可。当然有些数据比如两表的join操作,则必须对全表进行处理,那么在join时不限制分区即可,数据倒入时仍然时间分区装载数据。
绝对可以向hive增量导入数据的,只是不知道内部原理即从hdfs到hive这一过程。
Sqoop导入"where"子句的一个子集。它执行在各自的数据库服务器相应的SQL查询,并将结果存储在HDFS的目标目录。
where子句的语法如下。
导入intsmaze表数据的子集。子集查询检所有列但是居住城市为:sec-bad
$CONDITIONS参数是固定的,必须要写上。
增量添加数据进hdfs
我们查看hdfs上的数据
MySQL(或者别的RDBMS)导入数据到hdfs后会发现原来在mysql中字段值明明是NULL, 到Hive查询后 where field is null 会没有结果呢,然后通过检查一看,NULL值都变成了字段串'null'。其实你在导入的时候加上以下两个参数就可以解决了,
这里要注意一点。在hive里面。NULL是用\N来表示的。你可以自己做个实验 insert overwrite table tb select NULL from tb1 limit 1;然后在去查看原文件就可以发现了。多提一点,如果在导入后发现数据错位了,或者有好多原来有值的字段都变成了NULL,