public class PythonServiceConfig extends Object implements Serializable
mapUsingPython
stage.
Hazelcast Jet expects you to have a Python project in a local directory.
It must contain the definition of a transform_list()
function
that receives a list of strings and returns a list of strings of the
same size, with a one-to-one mapping between input and output elements.
Here's a simple example of a function that transforms every input
string by prepending "echo-"
to it:
def transform_list(input_list):
return ["echo-%s" % i for i in input_list]
If you have a very simple setup with everything in a single Python file,
you can use setHandlerFile(java.lang.String)
. Let's say you saved the above
Python code to a file named echo.py
. You can use it from Jet
like this:
StreamStage<String> inputStage = createInputStage();
StreamStage<String> outputStage = inputStage.apply(
mapUsingPython(new PythonServiceConfig()
.setHandlerFile("path/to/echo.py")));
In more complex setups you can tell Jet the location of your project
directory and the name of the Python module containing transform_list()
. You can
also use a different name for the
function.
Jet uploads the entire directory to the cluster, creates one or more Python processes on each member, and sends the pipeline data through your function. The number of processes is controlled by the local parallelism of the Python mapping stage.
Jet recognizes these special files in the base directory:
requirements.txt
is assumed to list the
dependencies of your Python code. Jet will automatically install
them to a job-local virtual environment. You can also install the
modules to the Jet servers' global Python environment in order to speed
up job initialization. Jet reuses the global modules and adds the
missing ones.
init.sh
is assumed to be a shell script that Jet will run when
initializing the job.
cleanup.sh
is assumed to be a shell script that Jet will run
when completing the job.
To use this stage in a Hazelcast Jet cluster, Python must be installed
on every cluster member. Jet supports Python versions 3.5-3.7. If the
code has dependencies on non-standard Python modules, these must either
be pre-installed or the member machines must have access to the public
internet so that Jet can download and install them. A third option is
to write init.sh
that uses a different way of installing the
dependencies. In that case make sure not to use the standard filename
requirements.txt
, which Jet uses automatically.
The Python mapping stage produces log output at the FINE
level
under the com.hazelcast.jet.python
log category. This includes
all the output from launched subprocesses.
Constructor and Description |
---|
PythonServiceConfig() |
Modifier and Type | Method and Description |
---|---|
File |
baseDir()
Returns the Python base directory.
|
BiFunctionEx<String,Integer,? extends io.grpc.ManagedChannelBuilder<?>> |
channelFn()
Returns the channel function, see
setChannelFn(com.hazelcast.function.BiFunctionEx<java.lang.String, java.lang.Integer, ? extends io.grpc.ManagedChannelBuilder<?>>) . |
File |
handlerFile()
Returns the Python handler file.
|
String |
handlerFunction()
Returns the name of the handler
function.
|
String |
handlerModule()
Returns the handler module name.
|
PythonServiceConfig |
setBaseDir(String baseDir)
Sets the base directory where the Python files reside.
|
PythonServiceConfig |
setChannelFn(BiFunctionEx<String,Integer,? extends io.grpc.ManagedChannelBuilder<?>> channelFn)
Sets the channel function.
|
PythonServiceConfig |
setHandlerFile(String handlerFile)
Sets the Python handler file.
|
PythonServiceConfig |
setHandlerFunction(String handlerFunction)
Overrides the default name of the Python function that transforms Jet
pipeline data.
|
PythonServiceConfig |
setHandlerModule(String handlerModule)
Sets the name of the Python module that has the function that
transforms Jet pipeline data.
|
void |
validate()
Validates the configuration and throws an exception if a mandatory
config option is missing.
|
public void validate()
PythonTransforms.mapUsingPython(com.hazelcast.jet.python.PythonServiceConfig)
.@Nullable public File baseDir()
@Nonnull public PythonServiceConfig setBaseDir(@Nonnull String baseDir)
handler module
to
identify the location of the handler function (named transform_list()
by convention).
If all you need to deploy to Jet is in a single file, you can call setHandlerFile(java.lang.String)
instead.
@Nullable public File handlerFile()
@Nonnull public PythonServiceConfig setHandlerFile(@Nonnull String handlerFile)
setBaseDir(java.lang.String)
instead.@Nullable public String handlerModule()
@Nonnull public PythonServiceConfig setHandlerModule(@Nonnull String handlerModule)
@Nonnull public String handlerFunction()
transform_list
.@Nonnull public PythonServiceConfig setHandlerFunction(@Nonnull String handlerFunction)
setHandlerModule(java.lang.String)
, must take a single argument that is a list of
strings, and return another list of strings which has the results of
transforming each item in the input list. There must be a strict
one-to-one match between the input and output lists.@Nonnull public BiFunctionEx<String,Integer,? extends io.grpc.ManagedChannelBuilder<?>> channelFn()
setChannelFn(com.hazelcast.function.BiFunctionEx<java.lang.String, java.lang.Integer, ? extends io.grpc.ManagedChannelBuilder<?>>)
.@Nonnull public PythonServiceConfig setChannelFn(@Nonnull BiFunctionEx<String,Integer,? extends io.grpc.ManagedChannelBuilder<?>> channelFn)
ManagedChannelBuilder
. You can use this to configure the channel, for
example to configure the maximum message size etc.
The default value is NettyChannelBuilder::forAddress
.
Copyright © 2022 Hazelcast, Inc.. All rights reserved.