SAS: Eine CSV-Datei auf effiziente Weise in Hive anlegen

Aus BI-Snippets - Business Intelligence Code und Module
Wechseln zu: Navigation, Suche

SAS bietet die Möglichkeit, einfach mit Hadoop zu arbeiten. Dazu kann man Libname Statements auf ein Schema in Hive abzusetzen. Dann ist es beispielsweise möglich, Hive-Tabellen mit einem einfachen DATA STEP oder PROC SQL anzulegen.

Handelt es sich bei den auf Hive anzulegenden Daten jedoch um eine CSV- oder TXT-Datei, so gibt es effizientere Wege, dies zu erreichen.

Natürlich ist es auch hier möglich, die CSV- oder TXT-Datei zuerst in SAS zu importieren, um sie anschließend auf gewohntem Weg mit einem DATA STEP auf Hive zu übertragen. Allerdings benötigt man zum effizienten Importieren von solchen Dateien möglichst genaue Informationen. Dazu gehören insbesondere die Längen der einzelnen Felder. Erfahrungsgemäß ist es aber nicht so, dass der Data Owner unserer CSV-Datei wirklich weiß, wie lang ein Eintrag in einem Feld maximal werden kann.

Das ist ein Problem für SAS. Denn einen DATA STEP zum Importieren der CSV-Datei zu verwenden ist somit nicht praktikabel. Dafür muss man die Länge der Felder kennen.

Ein PROC IMPORT kommt aber aus Performancegründen auch nicht infrage. Um sicher zu gehen, dass SAS die richtige Länge der Felder ermittelt, muss GUESSINGROWS verwendet werden. Und das auch noch mit der Option MAX. Wenn wir hier von Dateien mit dutzenden von Spalten und mehreren Millionen Datensätzen reden, dauert das ewig.

Es hilft zu wissen, dass ein Hive Table nichts anderes als eine CSV-Datei ist. Zudem ist es möglich, jede Art von Datei auf HDFS abzulegen.

Mit PROC HADOOP kann SAS einige (nicht alle) HDFS commands absetzen. Glücklicherweise gehört dazu die Funktion copyfromlocal. Wie der Name verrät, kann man damit Dateien von einem lokalen Verzeichnis in ein Verzeichnis auf HDFS kopieren.

Das sieht folgendermaßen aus:

<syntaxhighlight lang="SAS" line="5" >

proc hadoop;

 hdfs copyfromlocal="C:\Pfad\Datei.csv"
 out="/ein/verzeichnis/in/hdfs/Datei.csv"
 ;

run;

</syntaxhighlight>

Man sollte zumindest die Spaltennamen der CSV-Datei kennen. Auch die Datentypen können nicht schaden. Wenn man diese nicht kennt, so kann man auch jedes Feld als String anlegen.

<syntaxhighlight lang="SAS" line="1">

proc SQL noprint;
connect to hadoop(server='xxx' port=10000 schema=xxx SUBPROTOCOL=hive2 sql_functions=all);

execute (create table schema.Table1 ( Feld1 INT

        ,Feld2 STRING
        ,Feld3 DOUBLE

) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES("separatorChar"=";","serialization.encoding"="UTF-8") STORED AS TEXTFILE /* Es wird angenommen, dass die erste Zeile eine Header-Zeile ist */ TBLPROPERTIES ("skip.header.line.count"="1","serialization.null.format"="") ) by hadoop; disconnect from hadoop;

quit;

</syntaxhighlight>

Wie man sieht ist dieses SQL etwas ungewohnt. Das Create an sich dürfte bekannt sein. Es ist hier lediglich zu beachten, dass Hive, genauso wie SAS, weder Blanks noch Umlaute oder andere Sonderzeichen in seinen Feldnamen mag. Aber wenden wir uns dem Code danach zu:

<syntaxhighlight lang="SAS" line="1">

 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
 WITH SERDEPROPERTIES("separatorChar"=";","serialization.encoding"="UTF-8")

</syntaxhighlight>

Diese Zeile bedeutet, dass wir uns um die Enkodierung der CSV-Datei kümmern. SERDE definiert den Serializer und hier kann auch eine Enkodierung für die Quell-CSV angegeben werden. Dies wären zum Beispiel UTF-8 (offensichtlich für utf-8), ISO-8859-1 (für WLATIN1) oder WINDOWS-1252 (für ANSI). Des Weiteren gibt es die Möglichkeit einen Delimiter für die Datei anzugeben.

Interessant ist auch diese Zeile: <syntaxhighlight lang="SAS" line="1">

 TBLPROPERTIES ("skip.header.line.count"="1","serialization.null.format"="")

</syntaxhighlight>

In einer CSV-Datei stehen meist die Feldnamen in der ersten Zeile. Dies wird Hive hier mitgeteilt. Außerdem sollen NULL-Werte hier als Nichts dargestellt werden.

Anschließend kann man die so erstellte leere Hive-Tabelle mit der zuvor auf HDFS abgelegten CSV-Datei befüllen.

<syntaxhighlight lang="SAS" line="1">

proc SQL noprint;
connect to hadoop(server='xxx' port=10000 schema=xxx SUBPROTOCOL=hive2 sql_functions=all);

execute (

         LOAD DATA INPATH '/ein/verzeichnis/in/hdfs/Datei.csv' OVERWRITE INTO TABLE schema.Table1

) by hadoop;

    disconnect from hadoop;
quit;

</syntaxhighlight>

LOAD DATA INPATH macht dabei einen Move. Das heißt die CSV in HDFS existiert danach nicht mehr. Prinzipiell ist man nun fertig. Es wurde erfolgreich ein Hive Table aus einer CSV-Datei angelegt.

Aber diese Tabelle hat die angegebene Enkodierung der ursprünglichen CSV-Datei. Hive Tables sind immer UTF-8-enkodiert. Außerdem kann es zu Problemen kommen, wenn anschließend beispielsweise die Tabelle mit Spark weiter verarbeitet werden soll. Spark versteht nämlich nicht, was wir Hive vorher mitgeteilt haben. Nämlich, dass die erste Zeile nur die Feldnamen enthält.

Aber auch hier kann man sich helfen.

<syntaxhighlight lang="SAS" line="1">

proc SQL noprint;
connect to hadoop(server='xxx' port=10000 schema=xxx SUBPROTOCOL=hive2 sql_functions=all);

execute (create table schema.temp ROW FORMAT DELIMITED FIELDS TERMINATED BY ';' STORED AS TEXTFILE TBLPROPERTIES ("serialization.null.format"="") as select * from schema.Table1 ) by hadoop; disconnect from hadoop; quit;

</syntaxhighlight>

Man erstellt hier eine temporäre Tabelle aus der zuvor erzeugten. Hive macht nun die ganze Arbeit für uns. Die Header-Zeile existiert nicht mehr. Außerdem wird automatisch die korrekte Hive-UTF-8-Enkodierung angewendet.

Nun dropt man die Ursprungstabelle und benennt die temporäre Tabelle einfach wieder so wie vorher.

<syntaxhighlight lang="SAS" line="1">

proc SQL noprint;

connect to hadoop(server='xxx' port=10000 schema=xxx SUBPROTOCOL=hive2 sql_functions=all);

execute ( DROP TABLE schema.Table1 ) by hadoop;

disconnect from hadoop; quit;

proc SQL noprint;

connect to hadoop(server='xxx' port=10000 schema=xxx SUBPROTOCOL=hive2 sql_functions=all);

execute ( ALTER TABLE schema.temp RENAME TO schema.Table1 ) by hadoop;

disconnect from hadoop; quit;

</syntaxhighlight>

Fertig.