Skip to content

Latest commit

 

History

History
132 lines (118 loc) · 3.65 KB

flat-stream-reader.md

File metadata and controls

132 lines (118 loc) · 3.65 KB

The FlatStreamReader is for reading flat files a file system in streaming mode.

The options are optional.

  • maxFilesPerTrigger - the max # of files processed per micro-batch.

The field-schema information optional. If provided, it must be by either ddlFieldsString or ddlFieldsFile, and its content must be in the following format:

  • FieldName1:StartPos1-Length1 FieldType1, FieldName2:StartPos2-Length2 FieldType2, ...

Please note that the position starts from 1 (not 0).

Example:

    user:1-8 string, event:9-10 long, timestamp:19-32 string, interested:51-1 int
  • 1st field: user starting at position of 1 with length of 8 and type of string
  • 2nd field: event starting at position of 9 with length of 10 and type of long
  • 3rd field: timestamp starting at position of 19 with length of 32 and type of string
  • 4th field: interested starting at position of 51 with length of 1 and type of int

If the field-schema information is not provided, the output dataframe has the following schema:

   row_value string

To give a custom column name for the row_value, please use row.valueField properties in the definition.

For watermark configuration, the timeField is one field in the dataframe to be used for the delay calculation. To add a custom (processing) timestamp, please use the addTimestamp property. This column is added as the name of __timestamp.

To clean up processed files, please enable cleanSource option.

Actor Class: com.qwshen.etl.source.FlatStreamReader

The definition of the FlatReader:

  • In YAML format
  actor:
    type: flat-reader
    properties:
      options:
        header: true,
        maxFilesPerTrigger: 5
      ddlFieldsString: "user:1-8 string, event:9-10 long, timestamp:19-32 string, interested:51-1 int"
      watermark:
        timeField: __timestamp
        delayThreshold: 5 minutes
      addTimestamp: true
      fileUri: "${events.train_input}" 

or

  actor:
    type: flat-reader
    properties:
      ddlFieldsFile: schema/train.txt
      watermark:
        timeField: __timestamp
        delayThreshold: 5 minutes
      addTimestamp: true
      fileUri: "${events.train_input}" 
  • In JSON format
  {
    "actor": {
      "type": "flat-reader",
      "properties": {
        "row": {
          "valueField": "value"
        },
        "watermark": {
          "timeField": "__timestamp",
          "delayThreshold": "5 minutes"
        },
        "addTimestamp": true,
        "fileUri": "${events.train_input}"
      }
    }
  }

or

  {
    "actor": {
      "type": "flat-reader",
      "properties": {
        "ddlFieldsFile": "schema/train.txt",
        "watermark": {
          "timeField": "__timestamp",
          "delayThreshold": "5 minutes"
        },
        "addTimestamp": true,
        "fileUri": "${events.train_input}"
      }
    }
  }
  • In XML format
  <actor type="flat-reader">
    <properties>
      <ddlFieldsString>user:1-8 string, event:9-10 long, timestamp:19-32 string, interested:51-1 int</ddlFieldsString>
      <row>
        <valueField>value</valueField>
      </row>
      <watermark>
        <timeField>__tiemstamp</timeField>
        <delayThreshold>5 minutes</delayThreshold>
      </watermark>
      <addTimestamp>true</addTimestamp>
      <fileUri>${events.train_input}</fileUri>
    </properties>
  </actor>

or

  <actor type="flat-reader">
    <properties>
      <ddlFieldsFile>schema/train.txt</ddlFieldsFile>
      <watermark>
        <timeField>__tiemstamp</timeField>
        <delayThreshold>5 minutes</delayThreshold>
      </watermark>
      <addTimestamp>true</addTimestamp>
      <fileUri>${events.train_input}</fileUri>
    </properties>
  </actor>