3 votes

Ajouter un test unitaire pour Flink SQL

J'utilise Flink v1.7.1. Lorsque j'ai terminé un travail de streaming Flink avec tableSource, SQL et tableSink, je n'ai aucune idée de comment ajouter un test unitaire pour celui-ci.

2voto

Lifei Chen Points 70

J'ai trouvé un bon exemple sur la façon de tester flink sql avec l'aide de la liste de diffusion des utilisateurs, voici un exemple.

package org.apache.flink.table.runtime.stream.sql;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.utils.JavaStreamTestData;
import org.apache.flink.table.runtime.utils.StreamITCase;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
 * Integration tests for streaming SQL.
 */
public class JavaSqlITCase extends AbstractTestBase {

    @Test
    public void testRowRegisterRowWithNames() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        StreamITCase.clear();

        List<Row> data = new ArrayList<>();
        data.add(Row.of(1, 1L, "Hi"));
        data.add(Row.of(2, 2L, "Hello"));
        data.add(Row.of(3, 2L, "Hello world"));

        TypeInformation<?>[] types = {
                BasicTypeInfo.INT_TYPE_INFO,
                BasicTypeInfo.LONG_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO};
        String[] names = {"a", "b", "c"};

        RowTypeInfo typeInfo = new RowTypeInfo(types, names);

        DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);

        Table in = tableEnv.fromDataStream(ds, "a,b,c");
        tableEnv.registerTable("MyTableRow", in);

        String sqlQuery = "SELECT a,c FROM MyTableRow";
        Table result = tableEnv.sqlQuery(sqlQuery);

        DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
        resultSet.addSink(new StreamITCase.StringSink<Row>());
        env.execute();

        List<String> expected = new ArrayList<>();
        expected.add("1,Hi");
        expected.add("2,Hello");
        expected.add("3,Hello world");

        StreamITCase.compareWithList(expected);
    }
}

le code correspondant est aquí

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X