1 继承SourceFunction和ParallelSourceFunction

import org.apache.flink.streaming.api.functions.source.SourceFunction;

重新run()和cancel()方法

2 AccessSource 代码

package com.zyb.flink.basic.source;
import com.zyb.flink.basic.bean.Access;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;

public class AccessSource implements SourceFunction<Access> {
    boolean isRunning = true;

    @Override
    public void run(SourceContext<Access> ctx) throws Exception {
        Random random = new Random();
        String[] domains = {"pk1.com","pk2.com","pk3.com","pk4.com","pk5."};

        while (isRunning){
            long time = System.currentTimeMillis();
            ctx.collect(new Access(time,domains[random.nextInt(domains.length)],random.nextInt(1000)));
        }
        Thread.sleep(2000);
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

3 Access代码

package com.zyb.flink.basic.bean;

public class Access {
    private long time;
    private String domain;
    private double traffic;

    @Override
    public String toString() {
        return "Access{" +
                "time=" + time +
                ", domain='" + domain + '\'' +
                ", traffic=" + traffic +
                '}';
    }

    public Access() {
    }

    public Access(long time, String domain, double traffic) {
        this.time = time;
        this.domain = domain;
        this.traffic = traffic;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    public String getDomain() {
        return domain;
    }

    public void setDomain(String domain) {
        this.domain = domain;
    }

    public double getTraffic() {
        return traffic;
    }

    public void setTraffic(double traffic) {
        this.traffic = traffic;
    }
}

4 测试代码

package com.zyb.flink.basic.source;
import com.zyb.flink.basic.bean.Access;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;

public class AccessSource implements SourceFunction<Access> {
    boolean isRunning = true;

    @Override
    public void run(SourceContext<Access> ctx) throws Exception {
        Random random = new Random();
        String[] domains = {"pk1.com","pk2.com","pk3.com","pk4.com","pk5."};

        while (isRunning){
            long time = System.currentTimeMillis();
            ctx.collect(new Access(time,domains[random.nextInt(domains.length)],random.nextInt(1000)));
        }
        Thread.sleep(2000);
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐