DDF operations¶
packages¶
Module contents¶
-
class
ddf_library.ddf.
DDF
(**kwargs)¶ Bases:
ddf_library.bases.ddf_base.DDFSketch
Distributed DataFrame Handler.
-
add_column
(data2, suffixes=None)¶ Merge two DDF, column-wise.
Is it a Lazy function: No
Parameters: - data2 – The second DDF;
- suffixes – Suffixes in case of duplicated columns name (default, [“_l”,”_r”]);
Returns: DDF
Example: >>> ddf1.add_column(ddf2)
-
balancer
(forced=False)¶ Repartition data in order to balance the distributed data between nodes.
Returns: DDF Example: >>> ddf1.balancer(force=True)
-
cache
()¶ Currently it is only an alias for persist().
Returns: DDF Example: >>> ddf1.cache()
-
cast
(column, cast)¶ Change the data’s type of some columns.
Is it a Lazy function: Yes
Parameters: - column – String or list of strings with columns to cast;
- cast – String or list of string with the supported types: ‘integer’, ‘string’, ‘decimal’, ‘date’, ‘date/time’;
Returns: DDF
-
columns
()¶ Returns the columns name in the current DDF.
Returns: A list of strings
-
correlation
(col1, col2)¶ Calculate the Pearson Correlation Coefficient.
When one of the standard deviations is zero so the correlation is undefined (NaN).
Parameters: - col1 – The name of the first column
- col2 – The name of the second column
Returns: The result of sample covariance
-
count_rows
(total=True)¶ Return the number of rows in this DDF.
Parameters: total – Show the total number (default) or the number over the fragments. Returns: integer Example: >>> print(ddf1.count_rows())
-
covariance
(col1, col2)¶ Calculate the sample covariance for the given columns, specified by their names, as a double value.
Parameters: - col1 – The name of the first column
- col2 – The name of the second column
Returns: The result of sample covariance
-
cross_join
(data2)¶ Returns the cartesian product with another DDF.
Parameters: data2 – Right side of the cartesian product; Returns: DDF. Example: >>> ddf1.cross_join(ddf2)
-
cross_tab
(col1, col2)¶ Computes a pair-wise frequency table of the given columns. Also known as a contingency table. The number of distinct values for each column should be less than 1e4. At most 1e6 non-zero pair frequencies will be returned.
Is it a Lazy function: No
Parameters: - col1 – The name of the first column
- col2 – The name of the second column
Returns: DDF
Example: >>> ddf1.cross_tab(col1='col_1', col2='col_2')
-
crst_transform
(lat_col, lon_col, src_epsg, dst_epsg, lat_alias=None, lon_alias=None)¶ Given a source EPSG code, and target EPSG code, convert the Spatial Reference System / Coordinate Reference System.
Parameters: - lat_col – Latitude column name;
- lon_col – Longitude column name;
- src_epsg – Coordinate Reference System used in the source points;
- dst_epsg – Target coordinate Reference System;
- lat_alias – Latitude column alias (default, replace the input);
- lon_alias – Longitude column alias (default, replace the input);
Returns: DDF
Example: >>> ddf1.crst_transform('latitude', 'longitude', >>> src_epsg=4326, dst_epsg=32633)
-
describe
(columns=None)¶ Computes basic statistics for numeric and string columns. This include count, mean, number of NaN, stddev, min, and max.
Is it a Lazy function: No
Parameters: columns – A list of columns, if no columns are given, this function computes statistics for all numerical or string columns; Returns: A pandas DataFrame Example: >>> ddf1.describe(['col_1'])
-
distinct
(cols, opt=True)¶ Returns a new DDF containing the distinct rows in this DDF.
Is it a Lazy function: No
Parameters: - cols – subset of columns;
- opt – Tries to reduce partial output size before shuffle;
Returns: DDF
Example: >>> ddf1.distinct('col_1')
-
drop
(columns)¶ Remove some columns from DDF.
Is it a Lazy function: Yes
Parameters: columns – A list of columns names to be removed; Returns: DDF Example: >>> ddf1.drop(['col_1', 'col_2'])
-
drop_duplicates
(cols)¶ Alias for distinct.
Example: >>> ddf1.drop_duplicates('col_1')
-
dropna
(subset=None, mode='REMOVE_ROW', how='any', thresh=None)¶ Cleans missing rows or columns fields.
Is it a Lazy function: Yes, if mode is “REMOVE_ROW”, otherwise is No
Parameters: - subset – A list of attributes to evaluate;
- mode – “REMOVE_ROW”* to remove entire row (default) or “REMOVE_COLUMN” to remove a column.
- thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
- how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
Returns: DDF
Example: >>> ddf1.dropna(['col_1'], mode='REMOVE_ROW')
-
dtypes
()¶ Returns a list of dtypes of each column on the current DDF.
Returns: a list
-
except_all
(data2)¶ Returns a new DDF with containing rows in the first frame but not in the second one while preserving duplicates. This is equivalent to EXCEPT ALL in SQL.
Is it a Lazy function: No
Parameters: data2 – second DDF; Returns: DDF Example: >>> ddf1.except_all(ddf2)
-
explode
(column)¶ Returns a new row for each element in the given array.
Is it a Lazy function: Yes
Parameters: column – Column name to be unnest; Returns: DDF Example: >>> ddf1.explode('col_1')
-
export_ddf
()¶ Export ddf data.
Returns: A list of Pandas’s DataFrame
-
fillna
(subset=None, mode='VALUE', value=None)¶ Replace missing rows or columns by mean, median, value or mode.
Is it a Lazy function: Yes, only if mode “VALUE”
Parameters: - subset – A list of attributes to evaluate;
- mode – action in case of missing values: “VALUE” to replace by parameter “value” (default); “MEDIAN” to replace by median value; “MODE” to replace by mode value; “MEAN” to replace by mean value;
- value – Value to be replaced (only if mode is “VALUE”)
Returns: DDF
Example: >>> ddf1.fillna(['col_1'], value=42)
-
filter
(expr)¶ Filters rows using the given condition.
Is it a Lazy function: Yes
Parameters: expr – A filtering function; Returns: DDF See also
Visit this link to more information about query options.
Example: >>> ddf1.filter("(col_1 == 'male') and (col_3 > 42)")
-
freq_items
(col, support=0.01)¶ Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in “http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”
Is it a Lazy function: No
Parameters: - col – Names of the columns to calculate frequent items
- support – The frequency with which to consider an item ‘frequent’. Default is 1%. The support must be greater than 1e-4.
Returns: DDF
Example: >>> ddf1.freq_items(col='col_1', support=0.01)
-
geo_within
(shp_object, lat_col, lon_col, polygon, attributes=None, suffix='_shp')¶ Returns the sectors that the each point belongs.
Is it a Lazy function: No
Parameters: - shp_object – The DDF with the shapefile information;
- lat_col – Column which represents the Latitude field in the data;
- lon_col – Column which represents the Longitude field in the data;
- polygon – Field in shp_object where is store the coordinates of each sector;
- attributes – Attributes list to retrieve from shapefile, empty to all (default, None);
- suffix – Shapefile attributes suffix (default, ‘_shp’);
Returns: DDF
Example: >>> ddf2.geo_within(ddf1, 'LATITUDE', 'LONGITUDE', 'points')
-
group_by
(group_by)¶ Computes aggregates and returns the result as a DDF.
Is it a Lazy function: No
Parameters: group_by – A list of columns to be grouped; Returns: A GroupedDFF with a set of methods for aggregations on a DDF Example: >>> ddf1.group_by(group_by=['col_1']).mean(['col_2']).first(['col_2'])
-
hash_partition
(columns, nfrag=None)¶ Hash partitioning is a partitioning technique where data is stored separately in different fragments by a hash function.
Is it a Lazy function: No
Parameters: - columns – Columns to be used as key in a hash function;
- nfrag – Number of fragments (default, keep the input nfrag).
Example: >>> ddf2 = ddf1.hash_partition(columns=['col1', col2])
-
intersect
(data2)¶ Returns a new DDF containing rows in both DDF. This is equivalent to INTERSECT in SQL.
Is it a Lazy function: No
Parameters: data2 – DDF Returns: DDF Example: >>> ddf2.intersect(ddf1)
-
intersect_all
(data2)¶ Returns a new DDF containing rows in both DDF while preserving duplicates. This is equivalent to INTERSECT ALL in SQL.
Is it a Lazy function: No
Parameters: data2 – DDF Returns: DDF Example: >>> ddf2.intersect_all(ddf1)
-
join
(data2, key1=None, key2=None, mode='inner', suffixes=None, keep_keys=False, case=True)¶ Joins two DDF using the given join expression.
Is it a Lazy function: No
Parameters: - data2 – Second DDF;
- key1 – List of keys of first DDF;
- key2 – List of keys of second DDF;
- mode – How to handle the operation of the two objects. {‘left’, ‘right’, ‘inner’}, default ‘inner’
- suffixes – A list of suffix to be used in overlapping columns. Default is [‘_l’, ‘_r’];
- keep_keys – True to keep keys of second DDF (default is False);
- case – True to keep the keys as case sensitive;
Returns: DDF
Example: >>> ddf1.join(ddf2, key1=['col_1'], key2=['col_1'], mode='inner')
-
kolmogorov_smirnov_one_sample
(col, distribution='norm', mode='asymp', args=None)¶ Perform the Kolmogorov-Smirnov test for goodness of fit. This implementation of Kolmogorov–Smirnov test is a two-sided test for the null hypothesis that the sample is drawn from a continuous distribution.
Is it a Lazy function: No
param col: sample column name; param distribution: Name of distribution (default is ‘norm’); param mode: Defines the distribution used for calculating the p-value. - ‘approx’ : use approximation to exact distribution - ‘asymp’ : use asymptotic distribution of test statistic Parameters: args – A tuple of distribution parameters. Default is (0,1); Returns: KS statistic and two-tailed p-value See also
Visit this link to see all supported distributions.
Note
The KS statistic is the absolute max distance between the CDFs of the two samples. The closer this number is to 0 the more likely it is that the two samples were drawn from the same distribution.
The p-value returned by the KS test has the same interpretation as other p-values. You reject the null hypothesis that the two samples were drawn from the same distribution if the p-value is less than your significance level.
Example: >>> ddf1.kolmogorov_smirnov_one_sample(col='col_1')
-
map
(f, alias)¶ Apply a function to each row of this DDF.
Is it a Lazy function: Yes
Parameters: - f – Lambda function that will take each element of this data set as a parameter;
- alias – name of column to put the result;
Returns: DDF
Example: >>> from ddf_library.columns import col >>> from ddf_library.types import DataType >>> ddf1.map(col('col_0').cast(DataType.INT), 'col_0_new')
-
num_of_partitions
()¶ Returns the number of data partitions (Task parallelism).
Returns: integer Example: >>> print(ddf1.num_of_partitions())
-
persist
()¶ Compute the current flow and keep in disk.
Returns: DDF Example: >>> ddf1.persist()
-
range_partition
(columns, ascending=None, nfrag=None)¶ Range partitioning is a partitioning technique where ranges of data is stored separately in different fragments.
Is it a Lazy function: No
Parameters: - columns – Columns to be used as key;
- ascending – Order of each key (True to ascending order);
- nfrag – Number of fragments (default, keep the input nfrag).
Example: >>> ddf2 = ddf1.range_partition(columns=['col1', col2], >>> ascending=[True, False])
-
rename
(old_column, new_column)¶ Returns a new DDF by renaming an existing column. This is a no-op if schema does not contain the given column name.
Is it a Lazy function: Yes
Parameters: - old_column – String or list of strings with columns to rename;
- new_column – String or list of strings with new names.
Returns: DDF
-
repartition
(nfrag=-1, distribution=None)¶ Repartition a distributed data based in a fixed number of partitions or based on a distribution list.
Parameters: - nfrag – Optional, if used, the data will be partitioned in nfrag fragments.
- distribution – Optional, a list of integers where each element will represent the amount of data in this index.
Returns: DDF
-
replace
(replaces, subset=None, regex=False)¶ Replace one or more values to new ones.
Is it a Lazy function: Yes
Parameters: - replaces – dict-like to_replace;
- subset – A list of columns to be applied (default is None to applies in all columns);
- regex – Whether to interpret to_replace and/or value as regular expressions. If this is True then replaces must be a dictionary.
Returns: DDF
Example: >>> ddf1.replace({0: 'No', 1: 'Yes'}, subset=['col_1'])
-
sample
(value=None, seed=None)¶ Returns a sampled subset.
Is it a Lazy function: No
Parameters: - value – None to sample a random amount of records (default), a integer or float N to sample a N random records;
- seed – optional, seed for the random operation.
Returns: DDF
Example: >>> ddf1.sample(10) # to sample 10 rows >>> ddf1.sample(0.5) # to sample half of the elements >>> ddf1.sample() # a random sample
-
schema
()¶ Returns a schema table where each row contains the name columns and its data types of the current DDF.
Returns: a Pandas’s DataFrame
-
select
(columns)¶ Projects a set of expressions and returns a new DDF.
Is it a Lazy function: Yes
Parameters: columns – list of column names (string); Returns: DDF Example: >>> ddf1.select(['col_1', 'col_2'])
-
select_expression
(*exprs)¶ Projects a set of SQL expressions and returns a new DDF. This is a variant of select() that accepts SQL expressions.
Is it a Lazy function: Yes
Parameters: exprs – SQL expressions. Returns: DDF Note
These operations are supported by select_exprs:
- Arithmetic operations except for the left shift (<<) and right shift (>>) operators, e.g., ‘col’ + 2 * pi / s ** 4 % 42 - the_golden_ratio
- list and tuple literals, e.g., [1, 2] or (1, 2)
- Math functions: sin, cos, exp, log, abs, log10, …
- You must explicitly reference any local variable that you want to use in an expression by placing the @ character in front of the name.
- This Python syntax is not allowed:
- Function calls other than math functions.
- is/is not operations
- if expressions
- lambda expressions
- list/set/dict comprehensions
- Literal dict and set expressions
- yield expressions
- Generator expressions
- Boolean expressions consisting of only scalar values
- Statements: Neither simple nor compound statements are allowed.
See also
Visit this link to more information about eval options.
Example: >>> ddf1.select_exprs('col1 = age * 2', "abs(age)")
-
show
(n=20)¶ Print the DDF contents in a concatenated pandas’s DataFrame.
Parameters: n – A number of rows in the result (default is 20); Returns: DataFrame in stdout Example: >>> ddf1.show()
-
sort
(cols, ascending=None)¶ Returns a sorted DDF by the specified column(s).
Is it a Lazy function: No
Parameters: - cols – list of columns to be sorted;
- ascending – list indicating whether the sort order is ascending (True) for each column (Default, True);
Returns: DDF
Example: >>> dd1.sort(['col_1', 'col_2'], ascending=[True, False])
-
split
(percentage=0.5, seed=None)¶ Randomly splits a DDF into two DDF.
Is it a Lazy function: No
Parameters: - percentage – percentage to split the data (default, 0.5);
- seed – optional, seed in case of deterministic random operation;
Returns: DDF
Example: >>> ddf2a, ddf2b = ddf1.split(0.5)
-
subtract
(data2)¶ Returns a new DDF with containing rows in the first frame but not in the second one. This is equivalent to EXCEPT in SQL.
Is it a Lazy function: No
Parameters: data2 – second DDF; Returns: DDF Example: >>> ddf1.subtract(ddf2)
-
take
(num)¶ Returns the first num rows.
Is it a Lazy function: No
Parameters: num – number of rows to retrieve; Returns: DDF Example: >>> ddf1.take(10)
-
to_df
(columns=None, split=False)¶ Returns the DDF contents as a pandas’s DataFrame.
Parameters: - columns – Optional, A column name or list of column names;
- split – True to keep data in partitions (default, False);
Returns: Pandas’s DataFrame
Example: >>> df = ddf1.to_df(['col_1', 'col_2'])
-
union
(data2)¶ Combine this data set with some other DDF. Also as standard in SQL, this function resolves columns by position (not by name). Union can only be performed on tables with the same number of columns.
Is it a Lazy function: No
Parameters: data2 – Returns: DDF Example: >>> ddf1.union(ddf2)
-
union_by_name
(data2)¶ - Combine this data set with some other DDF. This function resolves
- columns by name (not by position).
Is it a Lazy function: No
Parameters: data2 – Returns: DDF Example: >>> ddf1.union_by_name(ddf2)
-
unpersist
()¶
-
-
class
ddf_library.bases.groupby.
GroupedDDF
¶ A set of methods for aggregations on a DDF, created by DDF.group_by().
The available aggregate functions are:
- avg: Computes average values for each numeric columns for each group;
- mean: Alias for avg;
- count: Counts the number of records for each group;
- first’: Returns the first element of group;
- last’: Returns the last element of group;
- max’: Computes the max value for each numeric columns for each group;
- min’: Computes the min value for each numeric column for each group;
- sum’: Computes the sum for each numeric columns for each group;
- list’: Returns a list of objects with duplicates;
- set’: Returns a set of objects with duplicate elements
-
GroupedDDF.
agg
(**exprs)¶ Compute aggregates and returns the result as a DDF.
Parameters: exprs – Tuples, where: alias=(‘column name’, function). Example: >>> ddf1.group_by(['col_1']).agg(MIN=('col_2', 'min'), >>> MAX=('col_3', 'max'))
-
GroupedDDF.
avg
(cols, alias=None)¶ Computes average values for each numeric columns for each group.
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).avg(['col_2'])
-
GroupedDDF.
count
(cols, alias=None)¶ Counts the number of records for each group.
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).count(['col_2'])
-
GroupedDDF.
first
(cols, alias=None)¶ Returns the first element of group.
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).avg(['col_2'])
-
GroupedDDF.
last
(cols, alias=None)¶ Returns the last element of group.
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).avg(['col_2'])
-
GroupedDDF.
list
(cols, alias=None)¶ Returns a list of objects with duplicates;
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).avg(['col_2'])
-
GroupedDDF.
max
(cols, alias=None)¶ Computes the max value for each numeric columns for each group.
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).avg(['col_2'])
-
GroupedDDF.
mean
(cols, alias=None)¶ Alias for avg. Computes average values for each numeric columns for each group.
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).mean(['col_2'])
-
GroupedDDF.
min
(cols, alias=None)¶ Computes the min value for each numeric column for each group.
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).min(['col_2'])
-
GroupedDDF.
set
(cols, alias=None)¶ Returns a set of objects with duplicate elements.
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).set(['col_2'])
-
GroupedDDF.
sum
(cols, alias=None)¶ Computes the sum for each numeric columns for each group.
Parameters: - cols – String or a list of columns names
- alias – String or a list of aliases
Example: >>> ddf1.group_by(group_by=['col_1']).sum(['col_2'])