splicemachine.spark package¶
Submodules¶
splicemachine.spark.context module¶
This Module contains the classes for interacting with the Database via our NSDS. For installation instructions, please see the Getting Started guide. For use inside the K8s cluster, see PySpliceContext. For use outside of the K8s cluster, see ExtPySpliceContext
Copyright 2021 Splice Machine, Inc.
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
-
class
ExtPySpliceContext(sparkSession, JDBC_URL=None, kafkaServers='localhost:9092', kafkaPollTimeout=20000, _unit_testing=False)[source]¶ Bases:
splicemachine.spark.context.PySpliceContextThis class implements a SplicemachineContext object from com.splicemachine.spark2 for use outside of the K8s Cloud Service
-
analyzeSchema(schema_name)¶ Analyze the schema
- Parameters
schema_name – (str) schema name which stats info will be collected
- Returns
None
-
analyzeTable(schema_table_name, estimateStatistics=False, samplePercent=10.0)¶ Collect stats info on a table
- Parameters
schema_table_name – full table name in the format of ‘schema.table’
estimateStatistics – will use estimate statistics if True
samplePercent – the percentage or rows to be sampled.
- Returns
None
-
autoCommitting()[source]¶ Check whether auto-commit is on.
- Returns
(Boolean) True if auto-commit is on.
-
bulkImportHFile(dataframe, schema_table_name, options)¶ Bulk Import HFile from a dataframe into a schema.table
- Parameters
dataframe – (DataFrame)
schema_table_name – (str) Full table name in the format of “schema.table”
options – (Dict) Dictionary of options to be passed to –splice-properties; bulkImportDirectory is required
- Returns
(int) Number of records imported
-
bulkImportHFileWithRdd(rdd, schema, schema_table_name, options)¶ Bulk Import HFile from an rdd into a schema.table
- Parameters
rdd – (RDD) Input data
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) Full table name in the format of “schema.table”
options – (Dict) Dictionary of options to be passed to –splice-properties; bulkImportDirectory is required
- Returns
(int) Number of records imported
-
columnNamesCaseSensitive(caseSensitive)¶ Sets whether column names should be treated as case sensitive.
- Parameters
caseSensitive – (boolean) True for case sensitive, False for not case sensitive
-
createAndInsertTable(dataframe, schema_table_name, primary_keys=None, create_table_options=None, to_upper=True)¶ Creates a schema.table (schema_table_name) from a dataframe and inserts the dataframe into the table
- Parameters
dataframe – The Spark DataFrame to base the table off
schema_table_name – str The schema.table to create
primary_keys – List[str] the primary keys. Default None
create_table_options – str The additional table-level SQL options default None
to_upper – bool If the dataframe columns should be converted to uppercase before table creation. If False, the table will be created with lower case columns. Default True
drop_table – bool whether to drop the table if it exists. Default False. If False and the table exists, the function will throw an exception
- Returns
None
-
createDataFrame(rdd, schema)¶ Creates a dataframe from a given rdd and schema.
- Parameters
rdd – (RDD) Input data
schema – (StructType) The schema of the rows in the RDD
- Returns
(DataFrame) The Spark DataFrame
-
createTable(dataframe, schema_table_name, primary_keys=None, create_table_options=None, to_upper=True, drop_table=False)¶ Creates a schema.table (schema_table_name) from a dataframe
- Parameters
dataframe – The Spark DataFrame to base the table off
schema_table_name – str The schema.table to create
primary_keys – List[str] the primary keys. Default None
create_table_options – str The additional table-level SQL options default None
to_upper – bool If the dataframe columns should be converted to uppercase before table creation. If False, the table will be created with lower case columns. Default True
drop_table – bool whether to drop the table if it exists. Default False. If False and the table exists, the function will throw an exception
- Returns
None
-
createTableWithSchema(schema_table_name, schema, keys=None, create_table_options=None)¶ Creates a schema.table from a schema
- Parameters
schema_table_name – str The schema.table to create
schema – (StructType) The schema that describes the columns of the table
keys – (List[str]) The primary keys. Default None
create_table_options – (str) The additional table-level SQL options. Default None
- Returns
None
-
delete(dataframe, schema_table_name)¶ Delete records in a dataframe based on joining by primary keys from the data frame. Be careful with column naming and case sensitivity.
- Parameters
dataframe – (Dataframe) The dataframe you would like to delete
schema_table_name – (str) Splice Machine Table
- Returns
None
-
deleteWithRdd(rdd, schema, schema_table_name)¶ Delete records using an rdd based on joining by primary keys from the rdd. Be careful with column naming and case sensitivity.
- Parameters
rdd – (RDD) The RDD containing the primary keys you would like to delete from the table
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) Splice Machine Table
- Returns
None
-
df(sql, to_lower=False)¶ Return a Spark Dataframe from the results of a Splice Machine SQL Query
- Example
df = splice.df('SELECT * FROM MYSCHEMA.TABLE1 WHERE COL2 > 3')
- Parameters
sql – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)
to_lower – Whether or not to convert column names from the dataframe to lowercase
- Returns
(Dataframe) A Spark DataFrame containing the results
-
dropTable(schema_and_or_table_name, table_name=None)¶ Drop a specified table.
- Example
splice.dropTable('schemaName.tableName') # or splice.dropTable('schemaName', 'tableName')
- Parameters
schema_and_or_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param
table_name – (optional) (str) Table Name, used when schema_and_or_table_name contains only the schema name
- Returns
None
-
dropTableIfExists(schema_table_name, table_name=None)¶ Drops a table if exists
- Example
splice.dropTableIfExists('schemaName.tableName') # or splice.dropTableIfExists('schemaName', 'tableName')
- Parameters
schema_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param
table_name – (optional) (str) Table Name, used when schema_table_name contains only the schema name
- Returns
None
-
execute(query_string)¶ execute a query over JDBC
- Example
splice.execute('DELETE FROM TABLE1 WHERE col2 > 3')
- Parameters
query_string – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)
- Returns
None
-
executeUpdate(query_string)¶ execute a dml query:(update,delete,drop,etc)
- Example
splice.executeUpdate('DROP TABLE table1')
- Parameters
query_string – (string) SQL Query (eg. DROP TABLE table1)
- Returns
None
-
export(dataframe, location, compression=False, replicationCount=1, fileEncoding=None, fieldSeparator=None, quoteCharacter=None)¶ Export a dataFrame in CSV
- Parameters
dataframe – (DataFrame)
location – (str) Destination directory
compression – (bool) Whether to compress the output or not
replicationCount – (int) Replication used for HDFS write
fileEncoding – (str) fileEncoding or None, defaults to UTF-8
fieldSeparator – (str) fieldSeparator or None, defaults to ‘,’
quoteCharacter – (str) quoteCharacter or None, defaults to ‘”’
- Returns
None
-
exportBinary(dataframe, location, compression, e_format='parquet')¶ Export a dataFrame in binary format
- Parameters
dataframe – (DataFrame)
location – (str) Destination directory
compression – (bool) Whether to compress the output or not
e_format – (str) Binary format to be used, currently only ‘parquet’ is supported. [Default ‘parquet’]
- Returns
None
-
fileToTable(file_path, schema_table_name, primary_keys=None, drop_table=False, **pandas_args)¶ Load a file from the local filesystem or from a remote location and create a new table (or recreate an existing table), and load the data from the file into the new table. Any file_path that can be read by pandas should work here.
- Parameters
file_path – The local file to load
schema_table_name – The schema.table name
primary_keys – List[str] of primary keys for the table. Default None
drop_table – Whether or not to drop the table. If this is False and the table already exists, the function will fail. Default False
pandas_args – Extra parameters to be passed into the pd.read_csv function. Any parameters accepted in pd.read_csv will work here
- Returns
None
-
getConnection()¶ Return a connection to the database
-
getSchema(schema_table_name)¶ Return the schema via JDBC.
- Parameters
schema_table_name – (str) Table name
- Returns
(StructType) PySpark StructType representation of the table
-
insert(dataframe, schema_table_name, to_upper=True, create_table=False)¶ Insert a dataframe into a table (schema.table).
- Parameters
dataframe – (Dataframe) The dataframe you would like to insert
schema_table_name – (str) The table in which you would like to insert the DF
to_upper – (bool) If the dataframe columns should be converted to uppercase before table creation If False, the table will be created with lower case columns. [Default True]
create_table – If the table does not exists at the time of the call, the table will first be created
- Returns
None
-
insertRdd(rdd, schema, schema_table_name)¶ Insert an rdd into a table (schema.table)
- Parameters
rdd – (RDD) The RDD you would like to insert
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) The table in which you would like to insert the RDD
- Returns
None
-
insertRddWithStatus(rdd, schema, schema_table_name, statusDirectory, badRecordsAllowed)¶ Insert an rdd into a table (schema.table) while tracking and limiting records that fail to insert. The status directory and number of badRecordsAllowed allow for duplicate primary keys to be written to a bad records file. If badRecordsAllowed is set to -1, all bad records will be written to the status directory.
- Parameters
rdd – (RDD) The RDD you would like to insert
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) The table in which you would like to insert the dataframe
statusDirectory – (str) The status directory where bad records file will be created
badRecordsAllowed – (int) The number of bad records are allowed. -1 for unlimited
- Returns
None
-
insertWithStatus(dataframe, schema_table_name, statusDirectory, badRecordsAllowed)¶ Insert a dataframe into a table (schema.table) while tracking and limiting records that fail to insert. The status directory and number of badRecordsAllowed allow for duplicate primary keys to be written to a bad records file. If badRecordsAllowed is set to -1, all bad records will be written to the status directory.
- Parameters
dataframe – (Dataframe) The dataframe you would like to insert
schema_table_name – (str) The table in which you would like to insert the dataframe
statusDirectory – (str) The status directory where bad records file will be created
badRecordsAllowed – (int) The number of bad records are allowed. -1 for unlimited
- Returns
None
-
internalDf(query_string)¶ SQL to Dataframe translation (Lazy). Runs the query inside Splice Machine and sends the results to the Spark Adapter app
- Parameters
query_string – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)
- Returns
(DataFrame) pyspark dataframe contains the result of query_string
-
internalRdd(schema_table_name, column_projection=None)¶ Table with projections in Splice mapped to an RDD. Runs the projection inside Splice Machine and sends the results to the Spark Adapter app as an rdd
- Parameters
schema_table_name – (str) Accessed table
column_projection – (list of strings) Names of selected columns
- Returns
(RDD[Row]) the result of the projection
-
mergeInto(dataframe, schema_table_name)¶ Rows in the dataframe whose primary key is not in schemaTableName will be inserted into the table; rows in the dataframe whose primary key is in schemaTableName will be used to update the table.
This implementation differs from upsert in a way that allows triggers to work.
- Parameters
dataframe – (Dataframe) The dataframe you would like to merge in
schema_table_name – (str) The table in which you would like to merge in the dataframe
- Returns
None
-
mergeIntoWithRdd(rdd, schema, schema_table_name)¶ Rows in the rdd whose primary key is not in schemaTableName will be inserted into the table; rows in the rdd whose primary key is in schemaTableName will be used to update the table.
This implementation differs from upsertWithRdd in a way that allows triggers to work.
- Parameters
rdd – (RDD) The RDD you would like to merge in
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) The table in which you would like to merge in the RDD
- Returns
None
-
pandasToSpark(pdf)¶ Convert a Pandas DF to Spark, and try to manage NANs from Pandas in case of failure. Spark cannot handle Pandas NAN existing in String columns (as it considers it NaN Number ironically), so we replace the occurances with a temporary value and then convert it back to null after it becomes a Spark DF
- Parameters
pdf – The Pandas dataframe
- Returns
The Spark DF
-
rdd(schema_table_name, column_projection=None)¶ Table with projections in Splice mapped to an RDD.
- Parameters
schema_table_name – (string) Accessed table
column_projection – (list of strings) Names of selected columns
- Returns
(RDD[Row]) the result of the projection
-
releaseSavepoint(savepoint)[source]¶ Release the savepoint. Throws exception if auto-commit is on. :param savepoint: (java.sql.Savepoint) A Savepoint.
- Returns
None
-
replaceDataframeSchema(dataframe, schema_table_name)¶ Returns a dataframe with all column names replaced with the proper string case from the DB table
- Parameters
dataframe – (Dataframe) A dataframe with column names to convert
schema_table_name – (str) The schema.table with the correct column cases to pull from the database
- Returns
(DataFrame) A Spark DataFrame with the replaced schema
-
rollbackToSavepoint(savepoint)[source]¶ Rollback to the savepoint. Throws exception if auto-commit is on. :param savepoint: (java.sql.Savepoint) A Savepoint.
- Returns
None
-
setAutoCommitOn()[source]¶ Turn auto-commit on. Auto-commit is on by default when the class is instantiated.
- Returns
None
-
setSavepoint()[source]¶ Create and set a unnamed savepoint at the current point in the transaction. Throws exception if auto-commit is on.
- Returns
(java.sql.Savepoint) The unnamed Savepoint
-
setSavepointWithName(name)[source]¶ Create and set a named savepoint at the current point in the transaction. Throws exception if auto-commit is on. :param name: (String) The name of the Savepoint.
- Returns
(java.sql.Savepoint) The named Savepoint
-
splitAndInsert(dataframe, schema_table_name, sample_fraction)¶ Sample the dataframe, split the table, and insert a dataFrame into a schema.table. This corresponds to an insert into from select statement
- Parameters
dataframe – (DataFrame) Input data
schema_table_name – (str) Full table name in the format of “schema.table”
sample_fraction – (float) A value between 0 and 1 that specifies the percentage of data in the dataFrame that should be sampled to determine the splits. For example, specify 0.005 if you want 0.5% of the data sampled.
- Returns
None
-
tableExists(schema_and_or_table_name, table_name=None)¶ Check whether or not a table exists
- Example
splice.tableExists('schemaName.tableName') # or splice.tableExists('schemaName', 'tableName')
- Parameters
schema_and_or_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param
table_name – (optional) (str) Table Name, used when schema_and_or_table_name contains only the schema name
- Returns
(bool) whether or not the table exists
-
toLower(dataframe)¶ Returns a dataframe with all of the columns in lowercase
- Parameters
dataframe – (Dataframe) The dataframe to convert to lowercase
-
toUpper(dataframe)¶ Returns a dataframe with all of the columns in uppercase
- Parameters
dataframe – (Dataframe) The dataframe to convert to uppercase
-
transactional()[source]¶ Check whether auto-commit is off.
- Returns
(Boolean) True if auto-commit is off.
-
truncateTable(schema_table_name)¶ Truncate a table
- Parameters
schema_table_name – (str) the full table name in the format “schema.table_name” which will be truncated
- Returns
None
-
update(dataframe, schema_table_name)¶ Update data from a dataframe for a specified schema_table_name (schema.table). The keys are required for the update and any other columns provided will be updated in the rows.
- Parameters
dataframe – (Dataframe) The dataframe you would like to update
schema_table_name – (str) Splice Machine Table
- Returns
None
-
updateWithRdd(rdd, schema, schema_table_name)¶ Update data from an rdd for a specified schema_table_name (schema.table). The keys are required for the update and any other columns provided will be updated in the rows.
- Parameters
rdd – (RDD) The RDD you would like to use for updating the table
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) Splice Machine Table
- Returns
None
-
upsert(dataframe, schema_table_name)¶ Upsert the data from a dataframe into a table (schema.table). If triggers fail when calling upsert, use the mergeInto function instead of upsert.
- Parameters
dataframe – (Dataframe) The dataframe you would like to upsert
schema_table_name – (str) The table in which you would like to upsert the RDD
- Returns
None
-
upsertWithRdd(rdd, schema, schema_table_name)¶ Upsert the data from an RDD into a table (schema.table). If triggers fail when calling upsertWithRdd, use the mergeIntoWithRdd function instead of upsertWithRdd.
- Parameters
rdd – (RDD) The RDD you would like to upsert
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) The table in which you would like to upsert the RDD
- Returns
None
-
-
class
PySpliceContext(sparkSession, JDBC_URL=None, _unit_testing=False)[source]¶ Bases:
objectThis class implements a SpliceMachineContext object (similar to the SparkContext object)
-
analyzeSchema(schema_name)[source]¶ Analyze the schema
- Parameters
schema_name – (str) schema name which stats info will be collected
- Returns
None
-
analyzeTable(schema_table_name, estimateStatistics=False, samplePercent=10.0)[source]¶ Collect stats info on a table
- Parameters
schema_table_name – full table name in the format of ‘schema.table’
estimateStatistics – will use estimate statistics if True
samplePercent – the percentage or rows to be sampled.
- Returns
None
-
bulkImportHFile(dataframe, schema_table_name, options)[source]¶ Bulk Import HFile from a dataframe into a schema.table
- Parameters
dataframe – (DataFrame)
schema_table_name – (str) Full table name in the format of “schema.table”
options – (Dict) Dictionary of options to be passed to –splice-properties; bulkImportDirectory is required
- Returns
(int) Number of records imported
-
bulkImportHFileWithRdd(rdd, schema, schema_table_name, options)[source]¶ Bulk Import HFile from an rdd into a schema.table
- Parameters
rdd – (RDD) Input data
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) Full table name in the format of “schema.table”
options – (Dict) Dictionary of options to be passed to –splice-properties; bulkImportDirectory is required
- Returns
(int) Number of records imported
-
columnNamesCaseSensitive(caseSensitive)[source]¶ Sets whether column names should be treated as case sensitive.
- Parameters
caseSensitive – (boolean) True for case sensitive, False for not case sensitive
-
createAndInsertTable(dataframe, schema_table_name, primary_keys=None, create_table_options=None, to_upper=True)[source]¶ Creates a schema.table (schema_table_name) from a dataframe and inserts the dataframe into the table
- Parameters
dataframe – The Spark DataFrame to base the table off
schema_table_name – str The schema.table to create
primary_keys – List[str] the primary keys. Default None
create_table_options – str The additional table-level SQL options default None
to_upper – bool If the dataframe columns should be converted to uppercase before table creation. If False, the table will be created with lower case columns. Default True
drop_table – bool whether to drop the table if it exists. Default False. If False and the table exists, the function will throw an exception
- Returns
None
-
createDataFrame(rdd, schema)[source]¶ Creates a dataframe from a given rdd and schema.
- Parameters
rdd – (RDD) Input data
schema – (StructType) The schema of the rows in the RDD
- Returns
(DataFrame) The Spark DataFrame
-
createTable(dataframe, schema_table_name, primary_keys=None, create_table_options=None, to_upper=True, drop_table=False)[source]¶ Creates a schema.table (schema_table_name) from a dataframe
- Parameters
dataframe – The Spark DataFrame to base the table off
schema_table_name – str The schema.table to create
primary_keys – List[str] the primary keys. Default None
create_table_options – str The additional table-level SQL options default None
to_upper – bool If the dataframe columns should be converted to uppercase before table creation. If False, the table will be created with lower case columns. Default True
drop_table – bool whether to drop the table if it exists. Default False. If False and the table exists, the function will throw an exception
- Returns
None
-
createTableWithSchema(schema_table_name, schema, keys=None, create_table_options=None)[source]¶ Creates a schema.table from a schema
- Parameters
schema_table_name – str The schema.table to create
schema – (StructType) The schema that describes the columns of the table
keys – (List[str]) The primary keys. Default None
create_table_options – (str) The additional table-level SQL options. Default None
- Returns
None
-
delete(dataframe, schema_table_name)[source]¶ Delete records in a dataframe based on joining by primary keys from the data frame. Be careful with column naming and case sensitivity.
- Parameters
dataframe – (Dataframe) The dataframe you would like to delete
schema_table_name – (str) Splice Machine Table
- Returns
None
-
deleteWithRdd(rdd, schema, schema_table_name)[source]¶ Delete records using an rdd based on joining by primary keys from the rdd. Be careful with column naming and case sensitivity.
- Parameters
rdd – (RDD) The RDD containing the primary keys you would like to delete from the table
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) Splice Machine Table
- Returns
None
-
df(sql, to_lower=False)[source]¶ Return a Spark Dataframe from the results of a Splice Machine SQL Query
- Example
df = splice.df('SELECT * FROM MYSCHEMA.TABLE1 WHERE COL2 > 3')
- Parameters
sql – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)
to_lower – Whether or not to convert column names from the dataframe to lowercase
- Returns
(Dataframe) A Spark DataFrame containing the results
-
dropTable(schema_and_or_table_name, table_name=None)[source]¶ Drop a specified table.
- Example
splice.dropTable('schemaName.tableName') # or splice.dropTable('schemaName', 'tableName')
- Parameters
schema_and_or_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param
table_name – (optional) (str) Table Name, used when schema_and_or_table_name contains only the schema name
- Returns
None
-
dropTableIfExists(schema_table_name, table_name=None)[source]¶ Drops a table if exists
- Example
splice.dropTableIfExists('schemaName.tableName') # or splice.dropTableIfExists('schemaName', 'tableName')
- Parameters
schema_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param
table_name – (optional) (str) Table Name, used when schema_table_name contains only the schema name
- Returns
None
-
execute(query_string)[source]¶ execute a query over JDBC
- Example
splice.execute('DELETE FROM TABLE1 WHERE col2 > 3')
- Parameters
query_string – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)
- Returns
None
-
executeUpdate(query_string)[source]¶ execute a dml query:(update,delete,drop,etc)
- Example
splice.executeUpdate('DROP TABLE table1')
- Parameters
query_string – (string) SQL Query (eg. DROP TABLE table1)
- Returns
None
-
export(dataframe, location, compression=False, replicationCount=1, fileEncoding=None, fieldSeparator=None, quoteCharacter=None)[source]¶ Export a dataFrame in CSV
- Parameters
dataframe – (DataFrame)
location – (str) Destination directory
compression – (bool) Whether to compress the output or not
replicationCount – (int) Replication used for HDFS write
fileEncoding – (str) fileEncoding or None, defaults to UTF-8
fieldSeparator – (str) fieldSeparator or None, defaults to ‘,’
quoteCharacter – (str) quoteCharacter or None, defaults to ‘”’
- Returns
None
-
exportBinary(dataframe, location, compression, e_format='parquet')[source]¶ Export a dataFrame in binary format
- Parameters
dataframe – (DataFrame)
location – (str) Destination directory
compression – (bool) Whether to compress the output or not
e_format – (str) Binary format to be used, currently only ‘parquet’ is supported. [Default ‘parquet’]
- Returns
None
-
fileToTable(file_path, schema_table_name, primary_keys=None, drop_table=False, **pandas_args)[source]¶ Load a file from the local filesystem or from a remote location and create a new table (or recreate an existing table), and load the data from the file into the new table. Any file_path that can be read by pandas should work here.
- Parameters
file_path – The local file to load
schema_table_name – The schema.table name
primary_keys – List[str] of primary keys for the table. Default None
drop_table – Whether or not to drop the table. If this is False and the table already exists, the function will fail. Default False
pandas_args – Extra parameters to be passed into the pd.read_csv function. Any parameters accepted in pd.read_csv will work here
- Returns
None
-
getSchema(schema_table_name)[source]¶ Return the schema via JDBC.
- Parameters
schema_table_name – (str) Table name
- Returns
(StructType) PySpark StructType representation of the table
-
insert(dataframe, schema_table_name, to_upper=True, create_table=False)[source]¶ Insert a dataframe into a table (schema.table).
- Parameters
dataframe – (Dataframe) The dataframe you would like to insert
schema_table_name – (str) The table in which you would like to insert the DF
to_upper – (bool) If the dataframe columns should be converted to uppercase before table creation If False, the table will be created with lower case columns. [Default True]
create_table – If the table does not exists at the time of the call, the table will first be created
- Returns
None
-
insertRdd(rdd, schema, schema_table_name)[source]¶ Insert an rdd into a table (schema.table)
- Parameters
rdd – (RDD) The RDD you would like to insert
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) The table in which you would like to insert the RDD
- Returns
None
-
insertRddWithStatus(rdd, schema, schema_table_name, statusDirectory, badRecordsAllowed)[source]¶ Insert an rdd into a table (schema.table) while tracking and limiting records that fail to insert. The status directory and number of badRecordsAllowed allow for duplicate primary keys to be written to a bad records file. If badRecordsAllowed is set to -1, all bad records will be written to the status directory.
- Parameters
rdd – (RDD) The RDD you would like to insert
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) The table in which you would like to insert the dataframe
statusDirectory – (str) The status directory where bad records file will be created
badRecordsAllowed – (int) The number of bad records are allowed. -1 for unlimited
- Returns
None
-
insertWithStatus(dataframe, schema_table_name, statusDirectory, badRecordsAllowed)[source]¶ Insert a dataframe into a table (schema.table) while tracking and limiting records that fail to insert. The status directory and number of badRecordsAllowed allow for duplicate primary keys to be written to a bad records file. If badRecordsAllowed is set to -1, all bad records will be written to the status directory.
- Parameters
dataframe – (Dataframe) The dataframe you would like to insert
schema_table_name – (str) The table in which you would like to insert the dataframe
statusDirectory – (str) The status directory where bad records file will be created
badRecordsAllowed – (int) The number of bad records are allowed. -1 for unlimited
- Returns
None
-
internalDf(query_string)[source]¶ SQL to Dataframe translation (Lazy). Runs the query inside Splice Machine and sends the results to the Spark Adapter app
- Parameters
query_string – (str) SQL Query (eg. SELECT * FROM table1 WHERE col2 > 3)
- Returns
(DataFrame) pyspark dataframe contains the result of query_string
-
internalRdd(schema_table_name, column_projection=None)[source]¶ Table with projections in Splice mapped to an RDD. Runs the projection inside Splice Machine and sends the results to the Spark Adapter app as an rdd
- Parameters
schema_table_name – (str) Accessed table
column_projection – (list of strings) Names of selected columns
- Returns
(RDD[Row]) the result of the projection
-
mergeInto(dataframe, schema_table_name)[source]¶ Rows in the dataframe whose primary key is not in schemaTableName will be inserted into the table; rows in the dataframe whose primary key is in schemaTableName will be used to update the table.
This implementation differs from upsert in a way that allows triggers to work.
- Parameters
dataframe – (Dataframe) The dataframe you would like to merge in
schema_table_name – (str) The table in which you would like to merge in the dataframe
- Returns
None
-
mergeIntoWithRdd(rdd, schema, schema_table_name)[source]¶ Rows in the rdd whose primary key is not in schemaTableName will be inserted into the table; rows in the rdd whose primary key is in schemaTableName will be used to update the table.
This implementation differs from upsertWithRdd in a way that allows triggers to work.
- Parameters
rdd – (RDD) The RDD you would like to merge in
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) The table in which you would like to merge in the RDD
- Returns
None
-
pandasToSpark(pdf)[source]¶ Convert a Pandas DF to Spark, and try to manage NANs from Pandas in case of failure. Spark cannot handle Pandas NAN existing in String columns (as it considers it NaN Number ironically), so we replace the occurances with a temporary value and then convert it back to null after it becomes a Spark DF
- Parameters
pdf – The Pandas dataframe
- Returns
The Spark DF
-
rdd(schema_table_name, column_projection=None)[source]¶ Table with projections in Splice mapped to an RDD.
- Parameters
schema_table_name – (string) Accessed table
column_projection – (list of strings) Names of selected columns
- Returns
(RDD[Row]) the result of the projection
-
replaceDataframeSchema(dataframe, schema_table_name)[source]¶ Returns a dataframe with all column names replaced with the proper string case from the DB table
- Parameters
dataframe – (Dataframe) A dataframe with column names to convert
schema_table_name – (str) The schema.table with the correct column cases to pull from the database
- Returns
(DataFrame) A Spark DataFrame with the replaced schema
-
splitAndInsert(dataframe, schema_table_name, sample_fraction)[source]¶ Sample the dataframe, split the table, and insert a dataFrame into a schema.table. This corresponds to an insert into from select statement
- Parameters
dataframe – (DataFrame) Input data
schema_table_name – (str) Full table name in the format of “schema.table”
sample_fraction – (float) A value between 0 and 1 that specifies the percentage of data in the dataFrame that should be sampled to determine the splits. For example, specify 0.005 if you want 0.5% of the data sampled.
- Returns
None
-
tableExists(schema_and_or_table_name, table_name=None)[source]¶ Check whether or not a table exists
- Example
splice.tableExists('schemaName.tableName') # or splice.tableExists('schemaName', 'tableName')
- Parameters
schema_and_or_table_name – (str) Pass the schema name in this param when passing the table_name param, or pass schemaName.tableName in this param without passing the table_name param
table_name – (optional) (str) Table Name, used when schema_and_or_table_name contains only the schema name
- Returns
(bool) whether or not the table exists
-
toLower(dataframe)[source]¶ Returns a dataframe with all of the columns in lowercase
- Parameters
dataframe – (Dataframe) The dataframe to convert to lowercase
-
toUpper(dataframe)[source]¶ Returns a dataframe with all of the columns in uppercase
- Parameters
dataframe – (Dataframe) The dataframe to convert to uppercase
-
truncateTable(schema_table_name)[source]¶ Truncate a table
- Parameters
schema_table_name – (str) the full table name in the format “schema.table_name” which will be truncated
- Returns
None
-
update(dataframe, schema_table_name)[source]¶ Update data from a dataframe for a specified schema_table_name (schema.table). The keys are required for the update and any other columns provided will be updated in the rows.
- Parameters
dataframe – (Dataframe) The dataframe you would like to update
schema_table_name – (str) Splice Machine Table
- Returns
None
-
updateWithRdd(rdd, schema, schema_table_name)[source]¶ Update data from an rdd for a specified schema_table_name (schema.table). The keys are required for the update and any other columns provided will be updated in the rows.
- Parameters
rdd – (RDD) The RDD you would like to use for updating the table
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) Splice Machine Table
- Returns
None
-
upsert(dataframe, schema_table_name)[source]¶ Upsert the data from a dataframe into a table (schema.table). If triggers fail when calling upsert, use the mergeInto function instead of upsert.
- Parameters
dataframe – (Dataframe) The dataframe you would like to upsert
schema_table_name – (str) The table in which you would like to upsert the RDD
- Returns
None
-
upsertWithRdd(rdd, schema, schema_table_name)[source]¶ Upsert the data from an RDD into a table (schema.table). If triggers fail when calling upsertWithRdd, use the mergeIntoWithRdd function instead of upsertWithRdd.
- Parameters
rdd – (RDD) The RDD you would like to upsert
schema – (StructType) The schema of the rows in the RDD
schema_table_name – (str) The table in which you would like to upsert the RDD
- Returns
None
-