splicemachine.spark package

Submodules

splicemachine.spark.context module

This Module contains the classes for interacting with the Database via our NSDS. For installation instructions, please see the Getting Started guide. For use inside the K8s cluster, see PySpliceContext. For use outside of the K8s cluster, see ExtPySpliceContext

Copyright 2021 Splice Machine, Inc.

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class ExtPySpliceContext(sparkSession, JDBC_URL=None, kafkaServers='localhost:9092', kafkaPollTimeout=20000, _unit_testing=False)[source]

Bases: splicemachine.spark.context.PySpliceContext

This class implements a SplicemachineContext object from com.splicemachine.spark2 for use outside of the K8s Cloud Service

analyzeSchema(schema_name)

Analyze the schema

Parameters

schema_name – (str) schema name which stats info will be collected

Returns

None

analyzeTable(schema_table_name, estimateStatistics=False, samplePercent=10.0)

Collect stats info on a table

Parameters
  • schema_table_name – full table name in the format of ‘schema.table’

  • estimateStatistics – will use estimate statistics if True

  • samplePercent – the percentage or rows to be sampled.

Returns

None

autoCommitting()[source]

Check whether auto-commit is on.

Returns

(Boolean) True if auto-commit is on.

bulkImportHFile(dataframe, schema_table_name, options)

Bulk Import HFile from a dataframe into a schema.table

Parameters
  • dataframe – (DataFrame)

  • schema_table_name – (str) Full table name in the format of “schema.table”

  • options – (Dict) Dictionary of options to be passed to –splice-properties; bulkImportDirectory is required

Returns

(int) Number of records imported

bulkImportHFileWithRdd(rdd, schema, schema_table_name, options)

Bulk Import HFile from an rdd into a schema.table

Parameters
  • rdd – (RDD) Input data

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) Full table name in the format of “schema.table”

  • options – (Dict) Dictionary of options to be passed to –splice-properties; bulkImportDirectory is required

Returns

(int) Number of records imported

columnNamesCaseSensitive(caseSensitive)

Sets whether column names should be treated as case sensitive.

Parameters

caseSensitive – (boolean) True for case sensitive, False for not case sensitive

commit()[source]

Commit the transaction. Throws exception if auto-commit is on.

Returns

None

createAndInsertTable(dataframe, schema_table_name, primary_keys=None, create_table_options=None, to_upper=True)

Creates a schema.table (schema_table_name) from a dataframe and inserts the dataframe into the table

Parameters
  • dataframe – The Spark DataFrame to base the table off

  • schema_table_name – str The schema.table to create

  • primary_keys – List[str] the primary keys. Default None

  • create_table_options – str The additional table-level SQL options default None

  • to_upper – bool If the dataframe columns should be converted to uppercase before table creation. If False, the table will be created with lower case columns. Default True

  • drop_table – bool whether to drop the table if it exists. Default False. If False and the table exists, the function will throw an exception

Returns

None

createDataFrame(rdd, schema)

Creates a dataframe from a given rdd and schema.

Parameters
  • rdd – (RDD) Input data

  • schema – (StructType) The schema of the rows in the RDD

Returns

(DataFrame) The Spark DataFrame

createTable(dataframe, schema_table_name, primary_keys=None, create_table_options=None, to_upper=True, drop_table=False)

Creates a schema.table (schema_table_name) from a dataframe

Parameters
  • dataframe – The Spark DataFrame to base the table off

  • schema_table_name – str The schema.table to create

  • primary_keys – List[str] the primary keys. Default None

  • create_table_options – str The additional table-level SQL options default None

  • to_upper – bool If the dataframe columns should be converted to uppercase before table creation. If False, the table will be created with lower case columns. Default True

  • drop_table – bool whether to drop the table if it exists. Default False. If False and the table exists, the function will throw an exception

Returns

None

createTableWithSchema(schema_table_name, schema, keys=None, create_table_options=None)

Creates a schema.table from a schema

Parameters
  • schema_table_name – str The schema.table to create

  • schema – (StructType) The schema that describes the columns of the table

  • keys – (List[str]) The primary keys. Default None

  • create_table_options – (str) The additional table-level SQL options. Default None

Returns

None

delete(dataframe, schema_table_name)

Delete records in a dataframe based on joining by primary keys from the data frame. Be careful with column naming and case sensitivity.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to delete

  • schema_table_name – (str) Splice Machine Table

Returns

None

deleteWithRdd(rdd, schema, schema_table_name)

Delete records using an rdd based on joining by primary keys from the rdd. Be careful with column naming and case sensitivity.

Parameters
  • rdd – (RDD) The RDD containing the primary keys you would like to delete from the table

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) Splice Machine Table

Returns

None

df(sql, to_lower=False)

Return a Spark Dataframe from the results of a Splice Machine SQL Query

Example
df = splice.df('SELECT * FROM MYSCHEMA.TABLE1 WHERE COL2 > 3')
Parameters
  • sql – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)

  • to_lower – Whether or not to convert column names from the dataframe to lowercase

Returns

(Dataframe) A Spark DataFrame containing the results

dropTable(schema_and_or_table_name, table_name=None)

Drop a specified table.

Example
splice.dropTable('schemaName.tableName') 

# or

splice.dropTable('schemaName', 'tableName')
Parameters
  • schema_and_or_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param

  • table_name – (optional) (str) Table Name, used when schema_and_or_table_name contains only the schema name

Returns

None

dropTableIfExists(schema_table_name, table_name=None)

Drops a table if exists

Example
splice.dropTableIfExists('schemaName.tableName') 

# or

splice.dropTableIfExists('schemaName', 'tableName')
Parameters
  • schema_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param

  • table_name – (optional) (str) Table Name, used when schema_table_name contains only the schema name

Returns

None

execute(query_string)

execute a query over JDBC

Example
splice.execute('DELETE FROM TABLE1 WHERE col2 > 3')
Parameters

query_string – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)

Returns

None

executeUpdate(query_string)

execute a dml query:(update,delete,drop,etc)

Example
splice.executeUpdate('DROP TABLE table1')
Parameters

query_string – (string) SQL Query (eg. DROP TABLE table1)

Returns

None

export(dataframe, location, compression=False, replicationCount=1, fileEncoding=None, fieldSeparator=None, quoteCharacter=None)

Export a dataFrame in CSV

Parameters
  • dataframe – (DataFrame)

  • location – (str) Destination directory

  • compression – (bool) Whether to compress the output or not

  • replicationCount – (int) Replication used for HDFS write

  • fileEncoding – (str) fileEncoding or None, defaults to UTF-8

  • fieldSeparator – (str) fieldSeparator or None, defaults to ‘,’

  • quoteCharacter – (str) quoteCharacter or None, defaults to ‘”’

Returns

None

exportBinary(dataframe, location, compression, e_format='parquet')

Export a dataFrame in binary format

Parameters
  • dataframe – (DataFrame)

  • location – (str) Destination directory

  • compression – (bool) Whether to compress the output or not

  • e_format – (str) Binary format to be used, currently only ‘parquet’ is supported. [Default ‘parquet’]

Returns

None

fileToTable(file_path, schema_table_name, primary_keys=None, drop_table=False, **pandas_args)

Load a file from the local filesystem or from a remote location and create a new table (or recreate an existing table), and load the data from the file into the new table. Any file_path that can be read by pandas should work here.

Parameters
  • file_path – The local file to load

  • schema_table_name – The schema.table name

  • primary_keys – List[str] of primary keys for the table. Default None

  • drop_table – Whether or not to drop the table. If this is False and the table already exists, the function will fail. Default False

  • pandas_args – Extra parameters to be passed into the pd.read_csv function. Any parameters accepted in pd.read_csv will work here

Returns

None

getConnection()

Return a connection to the database

getSchema(schema_table_name)

Return the schema via JDBC.

Parameters

schema_table_name – (str) Table name

Returns

(StructType) PySpark StructType representation of the table

insert(dataframe, schema_table_name, to_upper=True, create_table=False)

Insert a dataframe into a table (schema.table).

Parameters
  • dataframe – (Dataframe) The dataframe you would like to insert

  • schema_table_name – (str) The table in which you would like to insert the DF

  • to_upper – (bool) If the dataframe columns should be converted to uppercase before table creation If False, the table will be created with lower case columns. [Default True]

  • create_table – If the table does not exists at the time of the call, the table will first be created

Returns

None

insertRdd(rdd, schema, schema_table_name)

Insert an rdd into a table (schema.table)

Parameters
  • rdd – (RDD) The RDD you would like to insert

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) The table in which you would like to insert the RDD

Returns

None

insertRddWithStatus(rdd, schema, schema_table_name, statusDirectory, badRecordsAllowed)

Insert an rdd into a table (schema.table) while tracking and limiting records that fail to insert. The status directory and number of badRecordsAllowed allow for duplicate primary keys to be written to a bad records file. If badRecordsAllowed is set to -1, all bad records will be written to the status directory.

Parameters
  • rdd – (RDD) The RDD you would like to insert

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) The table in which you would like to insert the dataframe

  • statusDirectory – (str) The status directory where bad records file will be created

  • badRecordsAllowed – (int) The number of bad records are allowed. -1 for unlimited

Returns

None

insertWithStatus(dataframe, schema_table_name, statusDirectory, badRecordsAllowed)

Insert a dataframe into a table (schema.table) while tracking and limiting records that fail to insert. The status directory and number of badRecordsAllowed allow for duplicate primary keys to be written to a bad records file. If badRecordsAllowed is set to -1, all bad records will be written to the status directory.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to insert

  • schema_table_name – (str) The table in which you would like to insert the dataframe

  • statusDirectory – (str) The status directory where bad records file will be created

  • badRecordsAllowed – (int) The number of bad records are allowed. -1 for unlimited

Returns

None

internalDf(query_string)

SQL to Dataframe translation (Lazy). Runs the query inside Splice Machine and sends the results to the Spark Adapter app

Parameters

query_string – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)

Returns

(DataFrame) pyspark dataframe contains the result of query_string

internalRdd(schema_table_name, column_projection=None)

Table with projections in Splice mapped to an RDD. Runs the projection inside Splice Machine and sends the results to the Spark Adapter app as an rdd

Parameters
  • schema_table_name – (str) Accessed table

  • column_projection – (list of strings) Names of selected columns

Returns

(RDD[Row]) the result of the projection

mergeInto(dataframe, schema_table_name)

Rows in the dataframe whose primary key is not in schemaTableName will be inserted into the table; rows in the dataframe whose primary key is in schemaTableName will be used to update the table.

This implementation differs from upsert in a way that allows triggers to work.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to merge in

  • schema_table_name – (str) The table in which you would like to merge in the dataframe

Returns

None

mergeIntoWithRdd(rdd, schema, schema_table_name)

Rows in the rdd whose primary key is not in schemaTableName will be inserted into the table; rows in the rdd whose primary key is in schemaTableName will be used to update the table.

This implementation differs from upsertWithRdd in a way that allows triggers to work.

Parameters
  • rdd – (RDD) The RDD you would like to merge in

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) The table in which you would like to merge in the RDD

Returns

None

pandasToSpark(pdf)

Convert a Pandas DF to Spark, and try to manage NANs from Pandas in case of failure. Spark cannot handle Pandas NAN existing in String columns (as it considers it NaN Number ironically), so we replace the occurances with a temporary value and then convert it back to null after it becomes a Spark DF

Parameters

pdf – The Pandas dataframe

Returns

The Spark DF

rdd(schema_table_name, column_projection=None)

Table with projections in Splice mapped to an RDD.

Parameters
  • schema_table_name – (string) Accessed table

  • column_projection – (list of strings) Names of selected columns

Returns

(RDD[Row]) the result of the projection

releaseSavepoint(savepoint)[source]

Release the savepoint. Throws exception if auto-commit is on. :param savepoint: (java.sql.Savepoint) A Savepoint.

Returns

None

replaceDataframeSchema(dataframe, schema_table_name)

Returns a dataframe with all column names replaced with the proper string case from the DB table

Parameters
  • dataframe – (Dataframe) A dataframe with column names to convert

  • schema_table_name – (str) The schema.table with the correct column cases to pull from the database

Returns

(DataFrame) A Spark DataFrame with the replaced schema

rollback()[source]

Rollback the transaction. Throws exception if auto-commit is on.

Returns

None

rollbackToSavepoint(savepoint)[source]

Rollback to the savepoint. Throws exception if auto-commit is on. :param savepoint: (java.sql.Savepoint) A Savepoint.

Returns

None

setAutoCommitOff()[source]

Turn auto-commit off.

Returns

None

setAutoCommitOn()[source]

Turn auto-commit on. Auto-commit is on by default when the class is instantiated.

Returns

None

setSavepoint()[source]

Create and set a unnamed savepoint at the current point in the transaction. Throws exception if auto-commit is on.

Returns

(java.sql.Savepoint) The unnamed Savepoint

setSavepointWithName(name)[source]

Create and set a named savepoint at the current point in the transaction. Throws exception if auto-commit is on. :param name: (String) The name of the Savepoint.

Returns

(java.sql.Savepoint) The named Savepoint

splitAndInsert(dataframe, schema_table_name, sample_fraction)

Sample the dataframe, split the table, and insert a dataFrame into a schema.table. This corresponds to an insert into from select statement

Parameters
  • dataframe – (DataFrame) Input data

  • schema_table_name – (str) Full table name in the format of “schema.table”

  • sample_fraction – (float) A value between 0 and 1 that specifies the percentage of data in the dataFrame that should be sampled to determine the splits. For example, specify 0.005 if you want 0.5% of the data sampled.

Returns

None

tableExists(schema_and_or_table_name, table_name=None)

Check whether or not a table exists

Example
splice.tableExists('schemaName.tableName')

# or

splice.tableExists('schemaName', 'tableName')
Parameters
  • schema_and_or_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param

  • table_name – (optional) (str) Table Name, used when schema_and_or_table_name contains only the schema name

Returns

(bool) whether or not the table exists

toLower(dataframe)

Returns a dataframe with all of the columns in lowercase

Parameters

dataframe – (Dataframe) The dataframe to convert to lowercase

toUpper(dataframe)

Returns a dataframe with all of the columns in uppercase

Parameters

dataframe – (Dataframe) The dataframe to convert to uppercase

transactional()[source]

Check whether auto-commit is off.

Returns

(Boolean) True if auto-commit is off.

truncateTable(schema_table_name)

Truncate a table

Parameters

schema_table_name – (str) the full table name in the format “schema.table_name” which will be truncated

Returns

None

update(dataframe, schema_table_name)

Update data from a dataframe for a specified schema_table_name (schema.table). The keys are required for the update and any other columns provided will be updated in the rows.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to update

  • schema_table_name – (str) Splice Machine Table

Returns

None

updateWithRdd(rdd, schema, schema_table_name)

Update data from an rdd for a specified schema_table_name (schema.table). The keys are required for the update and any other columns provided will be updated in the rows.

Parameters
  • rdd – (RDD) The RDD you would like to use for updating the table

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) Splice Machine Table

Returns

None

upsert(dataframe, schema_table_name)

Upsert the data from a dataframe into a table (schema.table). If triggers fail when calling upsert, use the mergeInto function instead of upsert.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to upsert

  • schema_table_name – (str) The table in which you would like to upsert the RDD

Returns

None

upsertWithRdd(rdd, schema, schema_table_name)

Upsert the data from an RDD into a table (schema.table). If triggers fail when calling upsertWithRdd, use the mergeIntoWithRdd function instead of upsertWithRdd.

Parameters
  • rdd – (RDD) The RDD you would like to upsert

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) The table in which you would like to upsert the RDD

Returns

None

class PySpliceContext(sparkSession, JDBC_URL=None, _unit_testing=False)[source]

Bases: object

This class implements a SpliceMachineContext object (similar to the SparkContext object)

analyzeSchema(schema_name)[source]

Analyze the schema

Parameters

schema_name – (str) schema name which stats info will be collected

Returns

None

analyzeTable(schema_table_name, estimateStatistics=False, samplePercent=10.0)[source]

Collect stats info on a table

Parameters
  • schema_table_name – full table name in the format of ‘schema.table’

  • estimateStatistics – will use estimate statistics if True

  • samplePercent – the percentage or rows to be sampled.

Returns

None

bulkImportHFile(dataframe, schema_table_name, options)[source]

Bulk Import HFile from a dataframe into a schema.table

Parameters
  • dataframe – (DataFrame)

  • schema_table_name – (str) Full table name in the format of “schema.table”

  • options – (Dict) Dictionary of options to be passed to –splice-properties; bulkImportDirectory is required

Returns

(int) Number of records imported

bulkImportHFileWithRdd(rdd, schema, schema_table_name, options)[source]

Bulk Import HFile from an rdd into a schema.table

Parameters
  • rdd – (RDD) Input data

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) Full table name in the format of “schema.table”

  • options – (Dict) Dictionary of options to be passed to –splice-properties; bulkImportDirectory is required

Returns

(int) Number of records imported

columnNamesCaseSensitive(caseSensitive)[source]

Sets whether column names should be treated as case sensitive.

Parameters

caseSensitive – (boolean) True for case sensitive, False for not case sensitive

createAndInsertTable(dataframe, schema_table_name, primary_keys=None, create_table_options=None, to_upper=True)[source]

Creates a schema.table (schema_table_name) from a dataframe and inserts the dataframe into the table

Parameters
  • dataframe – The Spark DataFrame to base the table off

  • schema_table_name – str The schema.table to create

  • primary_keys – List[str] the primary keys. Default None

  • create_table_options – str The additional table-level SQL options default None

  • to_upper – bool If the dataframe columns should be converted to uppercase before table creation. If False, the table will be created with lower case columns. Default True

  • drop_table – bool whether to drop the table if it exists. Default False. If False and the table exists, the function will throw an exception

Returns

None

createDataFrame(rdd, schema)[source]

Creates a dataframe from a given rdd and schema.

Parameters
  • rdd – (RDD) Input data

  • schema – (StructType) The schema of the rows in the RDD

Returns

(DataFrame) The Spark DataFrame

createTable(dataframe, schema_table_name, primary_keys=None, create_table_options=None, to_upper=True, drop_table=False)[source]

Creates a schema.table (schema_table_name) from a dataframe

Parameters
  • dataframe – The Spark DataFrame to base the table off

  • schema_table_name – str The schema.table to create

  • primary_keys – List[str] the primary keys. Default None

  • create_table_options – str The additional table-level SQL options default None

  • to_upper – bool If the dataframe columns should be converted to uppercase before table creation. If False, the table will be created with lower case columns. Default True

  • drop_table – bool whether to drop the table if it exists. Default False. If False and the table exists, the function will throw an exception

Returns

None

createTableWithSchema(schema_table_name, schema, keys=None, create_table_options=None)[source]

Creates a schema.table from a schema

Parameters
  • schema_table_name – str The schema.table to create

  • schema – (StructType) The schema that describes the columns of the table

  • keys – (List[str]) The primary keys. Default None

  • create_table_options – (str) The additional table-level SQL options. Default None

Returns

None

delete(dataframe, schema_table_name)[source]

Delete records in a dataframe based on joining by primary keys from the data frame. Be careful with column naming and case sensitivity.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to delete

  • schema_table_name – (str) Splice Machine Table

Returns

None

deleteWithRdd(rdd, schema, schema_table_name)[source]

Delete records using an rdd based on joining by primary keys from the rdd. Be careful with column naming and case sensitivity.

Parameters
  • rdd – (RDD) The RDD containing the primary keys you would like to delete from the table

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) Splice Machine Table

Returns

None

df(sql, to_lower=False)[source]

Return a Spark Dataframe from the results of a Splice Machine SQL Query

Example
df = splice.df('SELECT * FROM MYSCHEMA.TABLE1 WHERE COL2 > 3')
Parameters
  • sql – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)

  • to_lower – Whether or not to convert column names from the dataframe to lowercase

Returns

(Dataframe) A Spark DataFrame containing the results

dropTable(schema_and_or_table_name, table_name=None)[source]

Drop a specified table.

Example
splice.dropTable('schemaName.tableName') 

# or

splice.dropTable('schemaName', 'tableName')
Parameters
  • schema_and_or_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param

  • table_name – (optional) (str) Table Name, used when schema_and_or_table_name contains only the schema name

Returns

None

dropTableIfExists(schema_table_name, table_name=None)[source]

Drops a table if exists

Example
splice.dropTableIfExists('schemaName.tableName') 

# or

splice.dropTableIfExists('schemaName', 'tableName')
Parameters
  • schema_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param

  • table_name – (optional) (str) Table Name, used when schema_table_name contains only the schema name

Returns

None

execute(query_string)[source]

execute a query over JDBC

Example
splice.execute('DELETE FROM TABLE1 WHERE col2 > 3')
Parameters

query_string – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)

Returns

None

executeUpdate(query_string)[source]

execute a dml query:(update,delete,drop,etc)

Example
splice.executeUpdate('DROP TABLE table1')
Parameters

query_string – (string) SQL Query (eg. DROP TABLE table1)

Returns

None

export(dataframe, location, compression=False, replicationCount=1, fileEncoding=None, fieldSeparator=None, quoteCharacter=None)[source]

Export a dataFrame in CSV

Parameters
  • dataframe – (DataFrame)

  • location – (str) Destination directory

  • compression – (bool) Whether to compress the output or not

  • replicationCount – (int) Replication used for HDFS write

  • fileEncoding – (str) fileEncoding or None, defaults to UTF-8

  • fieldSeparator – (str) fieldSeparator or None, defaults to ‘,’

  • quoteCharacter – (str) quoteCharacter or None, defaults to ‘”’

Returns

None

exportBinary(dataframe, location, compression, e_format='parquet')[source]

Export a dataFrame in binary format

Parameters
  • dataframe – (DataFrame)

  • location – (str) Destination directory

  • compression – (bool) Whether to compress the output or not

  • e_format – (str) Binary format to be used, currently only ‘parquet’ is supported. [Default ‘parquet’]

Returns

None

fileToTable(file_path, schema_table_name, primary_keys=None, drop_table=False, **pandas_args)[source]

Load a file from the local filesystem or from a remote location and create a new table (or recreate an existing table), and load the data from the file into the new table. Any file_path that can be read by pandas should work here.

Parameters
  • file_path – The local file to load

  • schema_table_name – The schema.table name

  • primary_keys – List[str] of primary keys for the table. Default None

  • drop_table – Whether or not to drop the table. If this is False and the table already exists, the function will fail. Default False

  • pandas_args – Extra parameters to be passed into the pd.read_csv function. Any parameters accepted in pd.read_csv will work here

Returns

None

getConnection()[source]

Return a connection to the database

getSchema(schema_table_name)[source]

Return the schema via JDBC.

Parameters

schema_table_name – (str) Table name

Returns

(StructType) PySpark StructType representation of the table

insert(dataframe, schema_table_name, to_upper=True, create_table=False)[source]

Insert a dataframe into a table (schema.table).

Parameters
  • dataframe – (Dataframe) The dataframe you would like to insert

  • schema_table_name – (str) The table in which you would like to insert the DF

  • to_upper – (bool) If the dataframe columns should be converted to uppercase before table creation If False, the table will be created with lower case columns. [Default True]

  • create_table – If the table does not exists at the time of the call, the table will first be created

Returns

None

insertRdd(rdd, schema, schema_table_name)[source]

Insert an rdd into a table (schema.table)

Parameters
  • rdd – (RDD) The RDD you would like to insert

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) The table in which you would like to insert the RDD

Returns

None

insertRddWithStatus(rdd, schema, schema_table_name, statusDirectory, badRecordsAllowed)[source]

Insert an rdd into a table (schema.table) while tracking and limiting records that fail to insert. The status directory and number of badRecordsAllowed allow for duplicate primary keys to be written to a bad records file. If badRecordsAllowed is set to -1, all bad records will be written to the status directory.

Parameters
  • rdd – (RDD) The RDD you would like to insert

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) The table in which you would like to insert the dataframe

  • statusDirectory – (str) The status directory where bad records file will be created

  • badRecordsAllowed – (int) The number of bad records are allowed. -1 for unlimited

Returns

None

insertWithStatus(dataframe, schema_table_name, statusDirectory, badRecordsAllowed)[source]

Insert a dataframe into a table (schema.table) while tracking and limiting records that fail to insert. The status directory and number of badRecordsAllowed allow for duplicate primary keys to be written to a bad records file. If badRecordsAllowed is set to -1, all bad records will be written to the status directory.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to insert

  • schema_table_name – (str) The table in which you would like to insert the dataframe

  • statusDirectory – (str) The status directory where bad records file will be created

  • badRecordsAllowed – (int) The number of bad records are allowed. -1 for unlimited

Returns

None

internalDf(query_string)[source]

SQL to Dataframe translation (Lazy). Runs the query inside Splice Machine and sends the results to the Spark Adapter app

Parameters

query_string – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)

Returns

(DataFrame) pyspark dataframe contains the result of query_string

internalRdd(schema_table_name, column_projection=None)[source]

Table with projections in Splice mapped to an RDD. Runs the projection inside Splice Machine and sends the results to the Spark Adapter app as an rdd

Parameters
  • schema_table_name – (str) Accessed table

  • column_projection – (list of strings) Names of selected columns

Returns

(RDD[Row]) the result of the projection

mergeInto(dataframe, schema_table_name)[source]

Rows in the dataframe whose primary key is not in schemaTableName will be inserted into the table; rows in the dataframe whose primary key is in schemaTableName will be used to update the table.

This implementation differs from upsert in a way that allows triggers to work.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to merge in

  • schema_table_name – (str) The table in which you would like to merge in the dataframe

Returns

None

mergeIntoWithRdd(rdd, schema, schema_table_name)[source]

Rows in the rdd whose primary key is not in schemaTableName will be inserted into the table; rows in the rdd whose primary key is in schemaTableName will be used to update the table.

This implementation differs from upsertWithRdd in a way that allows triggers to work.

Parameters
  • rdd – (RDD) The RDD you would like to merge in

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) The table in which you would like to merge in the RDD

Returns

None

pandasToSpark(pdf)[source]

Convert a Pandas DF to Spark, and try to manage NANs from Pandas in case of failure. Spark cannot handle Pandas NAN existing in String columns (as it considers it NaN Number ironically), so we replace the occurances with a temporary value and then convert it back to null after it becomes a Spark DF

Parameters

pdf – The Pandas dataframe

Returns

The Spark DF

rdd(schema_table_name, column_projection=None)[source]

Table with projections in Splice mapped to an RDD.

Parameters
  • schema_table_name – (string) Accessed table

  • column_projection – (list of strings) Names of selected columns

Returns

(RDD[Row]) the result of the projection

replaceDataframeSchema(dataframe, schema_table_name)[source]

Returns a dataframe with all column names replaced with the proper string case from the DB table

Parameters
  • dataframe – (Dataframe) A dataframe with column names to convert

  • schema_table_name – (str) The schema.table with the correct column cases to pull from the database

Returns

(DataFrame) A Spark DataFrame with the replaced schema

splitAndInsert(dataframe, schema_table_name, sample_fraction)[source]

Sample the dataframe, split the table, and insert a dataFrame into a schema.table. This corresponds to an insert into from select statement

Parameters
  • dataframe – (DataFrame) Input data

  • schema_table_name – (str) Full table name in the format of “schema.table”

  • sample_fraction – (float) A value between 0 and 1 that specifies the percentage of data in the dataFrame that should be sampled to determine the splits. For example, specify 0.005 if you want 0.5% of the data sampled.

Returns

None

tableExists(schema_and_or_table_name, table_name=None)[source]

Check whether or not a table exists

Example
splice.tableExists('schemaName.tableName')

# or

splice.tableExists('schemaName', 'tableName')
Parameters
  • schema_and_or_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param

  • table_name – (optional) (str) Table Name, used when schema_and_or_table_name contains only the schema name

Returns

(bool) whether or not the table exists

toLower(dataframe)[source]

Returns a dataframe with all of the columns in lowercase

Parameters

dataframe – (Dataframe) The dataframe to convert to lowercase

toUpper(dataframe)[source]

Returns a dataframe with all of the columns in uppercase

Parameters

dataframe – (Dataframe) The dataframe to convert to uppercase

truncateTable(schema_table_name)[source]

Truncate a table

Parameters

schema_table_name – (str) the full table name in the format “schema.table_name” which will be truncated

Returns

None

update(dataframe, schema_table_name)[source]

Update data from a dataframe for a specified schema_table_name (schema.table). The keys are required for the update and any other columns provided will be updated in the rows.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to update

  • schema_table_name – (str) Splice Machine Table

Returns

None

updateWithRdd(rdd, schema, schema_table_name)[source]

Update data from an rdd for a specified schema_table_name (schema.table). The keys are required for the update and any other columns provided will be updated in the rows.

Parameters
  • rdd – (RDD) The RDD you would like to use for updating the table

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) Splice Machine Table

Returns

None

upsert(dataframe, schema_table_name)[source]

Upsert the data from a dataframe into a table (schema.table). If triggers fail when calling upsert, use the mergeInto function instead of upsert.

Parameters
  • dataframe – (Dataframe) The dataframe you would like to upsert

  • schema_table_name – (str) The table in which you would like to upsert the RDD

Returns

None

upsertWithRdd(rdd, schema, schema_table_name)[source]

Upsert the data from an RDD into a table (schema.table). If triggers fail when calling upsertWithRdd, use the mergeIntoWithRdd function instead of upsertWithRdd.

Parameters
  • rdd – (RDD) The RDD you would like to upsert

  • schema – (StructType) The schema of the rows in the RDD

  • schema_table_name – (str) The table in which you would like to upsert the RDD

Returns

None

Module contents