Java连接Kettle资源库

Updated on with 0 views and 0 comments

简述

已知的连接资源库方式有两种,一种是通过读取XML配置连接,一种是在代码中动态绑定资源库

Java连接kettle需要配置一个译码插件,该插件通过xml配置,并将该xml放至classpath路径下,名称为:kettle-password-encoder-plugins.xml,这个xml有个配置的模板在源码中,位于下列包中

image.png

运行时绑定资源库

private static final String USERNAME = "admin";
    private static final String PASSWORD = "admin";
    public void getKettleJobList(com.core.datapipelines.entity.kettle.DatabaseMeta databaseMeta) throws KettleException {
        assert databaseMeta!=null;
        KettleEnvironment.init();
        EnvUtil.environmentInit();
        KettleDatabaseRepository repository = new KettleDatabaseRepository();
        DatabaseMeta con = new DatabaseMeta(databaseMeta.getName(), databaseMeta.getType(),
                databaseMeta.getAccess(), databaseMeta.getHost(), databaseMeta.getDb(),
                databaseMeta.getPort(), databaseMeta.getUsername(), databaseMeta.getPassword());
        KettleDatabaseRepositoryMeta meta = new KettleDatabaseRepositoryMeta(databaseMeta.getName() , databaseMeta.getName() , "Transformation description" , con);
        repository.init(meta);
        repository.connect(USERNAME , PASSWORD);
        RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree();
        RepositoryDirectoryInterface directory = repositoryDirectoryInterface.findDirectory("/");
        List<RepositoryDirectoryInterface> children = directory.getChildren();
        for(RepositoryDirectoryInterface subObj:children){
            System.out.println(subObj);
        }
    }

资源库信息描述

package com.core.datapipelines.entity.kettle;

import com.core.datapipelines.common.entity.BaseEntity;

public class DatabaseMeta   extends BaseEntity<Long> {

    private String name;

    private String type;

    private String access;

    private String host;

    private String db;

    private String port;

    private String username;

    private String password;

    public DatabaseMeta(Long id, String name, String type, String access, String host, String db, String port, String username, String password) {
        this.id = id;
        this.name = name;
        this.type = type;
        this.access = access;
        this.host = host;
        this.db = db;
        this.port = port;
        this.username = username;
        this.password = password;
    }

    public DatabaseMeta() {
        super();
    }


    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name == null ? null : name.trim();
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type == null ? null : type.trim();
    }

    public String getAccess() {
        return access;
    }

    public void setAccess(String access) {
        this.access = access;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host == null ? null : host.trim();
    }

    public String getDb() {
        return db;
    }

    public void setDb(String db) {
        this.db = db == null ? null : db.trim();
    }

    public String getPort() {
        return port;
    }

    public void setPort(String port) {
        this.port = port == null ? null : port.trim();
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username == null ? null : username.trim();
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password == null ? null : password.trim();
    }
}

基础实体

package com.core.datapipelines.common.entity;

public class BaseEntity<I> {
    public I id;

    public I getId() {
        return id;
    }

    public void setId(I id) {
        this.id = id;
    }
}

通过XML读取

kettle有个配置文件,叫repositories.xml在这里面配置资源库信息

代码如下:

  public void getRepositoryByXml(String name) throws KettleException {
        KettleEnvironment.init();
        EnvUtil.environmentInit();
        RepositoriesMeta repositoriesMeta = new RepositoriesMeta();
        repositoriesMeta.readData();
        RepositoryMeta repositoryMeta = repositoriesMeta.findRepository( name);
        if ( repositoryMeta == null ) {
            throw new KettleException( "Cannot find repository \"" + name + "\". Please make sure it is defined in your " + Const.getKettleUserRepositoriesFile() + " file" );
        }
        PluginRegistry registry = PluginRegistry.getInstance();
        Repository repository = registry.loadClass( RepositoryPluginType.class, repositoryMeta, Repository.class );
        repository.init( repositoryMeta );
        repository.connect( USERNAME, PASSWORD);
        RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree();
        RepositoryDirectoryInterface directory = repositoryDirectoryInterface.findDirectory("/");
        List<RepositoryDirectoryInterface> children = directory.getChildren();
        for(RepositoryDirectoryInterface subObj:children){
            System.out.println(subObj);
        }
    }

关键代码是

repositoriesMeta.readData();

这个方法会去读取repositories.xml中的配置信息,是源码中的一个方法,位于org.pentaho.di.repository.RepositoriesMeta中

配置模板

一般这个文件由kettle生成,没有的话可以参考下述文件内容

<?xml version="1.0" encoding="UTF-8"?>
<repositories>
  <connection>
    <name>kettle_repository</name>
    <server>11.1.0.132</server>
    <type>MYSQL</type>
    <access>Native</access>
    <database>kettle_repository</database>
    <port>3306</port>
    <username>kettle</username>
    <password>Encrypted 2be98afc80ecbbfaeac0fa8238a94abee</password>
    <servername/>
    <data_tablespace/>
    <index_tablespace/>
    <attributes>
      <attribute><code>EXTRA_OPTION_MYSQL.autoReconnect</code><attribute>true</attribute></attribute>
      <attribute><code>EXTRA_OPTION_MYSQL.characterEncoding</code><attribute>UTF-8</attribute></attribute>
      <attribute><code>EXTRA_OPTION_MYSQL.connectTimeout</code><attribute>120000000000000000</attribute></attribute>
      <attribute><code>EXTRA_OPTION_MYSQL.defaultFetchSize</code><attribute>500</attribute></attribute>
      <attribute><code>EXTRA_OPTION_MYSQL.rewriteBatchedStatements</code><attribute>true</attribute></attribute>
      <attribute><code>EXTRA_OPTION_MYSQL.socketTimeout</code><attribute>120000000000000000</attribute></attribute>
      <attribute><code>EXTRA_OPTION_MYSQL.useCompression</code><attribute>true</attribute></attribute>
      <attribute><code>EXTRA_OPTION_MYSQL.useCursorFetch</code><attribute>true</attribute></attribute>
      <attribute><code>EXTRA_OPTION_MYSQL.useServerPrepStmts</code><attribute>false</attribute></attribute>
      <attribute><code>FORCE_IDENTIFIERS_TO_LOWERCASE</code><attribute>N</attribute></attribute>
      <attribute><code>FORCE_IDENTIFIERS_TO_UPPERCASE</code><attribute>N</attribute></attribute>
      <attribute><code>INITIAL_POOL_SIZE</code><attribute>5</attribute></attribute>
      <attribute><code>IS_CLUSTERED</code><attribute>N</attribute></attribute>
      <attribute><code>MAXIMUM_POOL_SIZE</code><attribute>200</attribute></attribute>
      <attribute><code>POOLING_initialSize</code><attribute>5</attribute></attribute>
      <attribute><code>POOLING_maxActive</code><attribute>200</attribute></attribute>
      <attribute><code>POOLING_maxIdle</code><attribute>120000</attribute></attribute>
      <attribute><code>POOLING_maxWait</code><attribute>60000</attribute></attribute>
      <attribute><code>POOLING_minIdle</code><attribute>5</attribute></attribute>
      <attribute><code>POOLING_testOnBorrow</code><attribute>true</attribute></attribute>
      <attribute><code>POOLING_testOnReturn</code><attribute>true</attribute></attribute>
      <attribute><code>POOLING_testWhileIdle</code><attribute>true</attribute></attribute>
      <attribute><code>POOLING_validationQuery</code><attribute>SELECT 1 FROM DUAL</attribute></attribute>
      <attribute><code>PORT_NUMBER</code><attribute>3306</attribute></attribute>
      <attribute><code>PRESERVE_RESERVED_WORD_CASE</code><attribute>Y</attribute></attribute>
      <attribute><code>QUOTE_ALL_FIELDS</code><attribute>N</attribute></attribute>
      <attribute><code>STREAM_RESULTS</code><attribute>Y</attribute></attribute>
      <attribute><code>SUPPORTS_BOOLEAN_DATA_TYPE</code><attribute>Y</attribute></attribute>
      <attribute><code>SUPPORTS_TIMESTAMP_DATA_TYPE</code><attribute>Y</attribute></attribute>
      <attribute><code>USE_POOLING</code><attribute>Y</attribute></attribute>
    </attributes>
  </connection>
  <connection>
    <name>etl_prod</name>
    <server>10.116.152.6</server>
    <type>MYSQL</type>
    <access>Native</access>
    <database>etl_prod</database>
    <port>3306</port>
    <username>ETLMetadata</username>
    <password>Encrypted 2be98afc82bc68b83bd10a478cbc7fd88</password>
    <servername/>
    <data_tablespace/>
    <index_tablespace/>
    <attributes>
      <attribute><code>FORCE_IDENTIFIERS_TO_LOWERCASE</code><attribute>N</attribute></attribute>
      <attribute><code>FORCE_IDENTIFIERS_TO_UPPERCASE</code><attribute>N</attribute></attribute>
      <attribute><code>IS_CLUSTERED</code><attribute>N</attribute></attribute>
      <attribute><code>PORT_NUMBER</code><attribute>3306</attribute></attribute>
      <attribute><code>PRESERVE_RESERVED_WORD_CASE</code><attribute>Y</attribute></attribute>
      <attribute><code>QUOTE_ALL_FIELDS</code><attribute>N</attribute></attribute>
      <attribute><code>STREAM_RESULTS</code><attribute>Y</attribute></attribute>
      <attribute><code>SUPPORTS_BOOLEAN_DATA_TYPE</code><attribute>Y</attribute></attribute>
      <attribute><code>SUPPORTS_TIMESTAMP_DATA_TYPE</code><attribute>Y</attribute></attribute>
      <attribute><code>USE_POOLING</code><attribute>N</attribute></attribute>
    </attributes>
  </connection>
  <repository>    <id>KettleDatabaseRepository</id>
    <name>mysql_repository</name>
    <description>Database repository</description>
    <is_default>true</is_default>
    <connection>kettle_repository</connection>
  </repository>  <repository>    <id>KettleDatabaseRepository</id>
    <name>old_etl_pord</name>
    <description>Database repository</description>
    <is_default>false</is_default>
    <connection>etl_prod</connection>
  </repository>  <repository>    <id>PentahoEnterpriseRepository</id>
    <name>singleDiServerInstance</name>
    <description>singleDiServerInstance</description>
    <is_default>false</is_default>
    <repository_location_url>http://localhost:9080/pentaho</repository_location_url>
    <version_comment_mandatory>N</version_comment_mandatory>
  </repository>  </repositories>


标题:Java连接Kettle资源库
作者:wenyl
地址:http://www.wenyoulong.com/articles/2021/05/20/1621495899670.html