http-service
The http-service source receives POST requests via HTTP and HTTPS protocols in format such as text and JSON and sends responses via its corresponding http-service-response sink correlated through a unique source.id.
For request and response correlation, it generates a messageId upon each incoming request and expose it via transport
properties in the format trp:messageId to correlate them with the responses at the http-service-response sink.
The request headers and properties can be accessed via transport properties in the format trp:<header>.
It also supports basic authentication to ensure events are received from authorized users/systems.
Syntax
CREATE SOURCE <NAME> WITH (type="http-service", map.type="<STRING>", receiver.url="<STRING>", source.id="<STRING>", connection.timeout="<INT>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", ssl.configurations="<STRING>", request.size.validation.configurations="<STRING>", header.validation.configurations="<STRING>", server.bootstrap.configurations="<STRING>", trace.log.enabled="<BOOL>")
Query Parameters
| Name | Description | Default Value | Possible Data Types | Optional | Dynamic | 
|---|---|---|---|---|---|
| receiver.url | The URL on which events should be received. To enable SSL, use httpsprotocol in the URL. | http://0.0.0.0:9763// | STRING | Yes | No | 
| source.id | Identifier to correlate the http-service source to its corresponding http-service-response sinks to send responses. | STRING | No | No | |
| connection.timeout | Connection timeout in millis. The system will send a timeout, if a corresponding response is not sent by an associated http-service-response sink within the given time. | 120000 | INT | Yes | No | 
| basic.auth.enabled | This only works in VM, Docker and Kubernetes. Where when enabled it authenticates each request using the Authorization:'Basic encodeBase64(username:Password)'header. | false | STRING | Yes | No | 
| worker.count | The number of active worker threads to serve the incoming events. By default the value is set to 1to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of loosing event ordering. | 1 | INT | Yes | No | 
| socket.idle.timeout | Idle timeout for HTTP connection in millis. | 120000 | INT | Yes | No | 
| ssl.verify.client | The type of client certificate verification. Supported values are require,optional. | - | STRING | Yes | No | 
| ssl.protocol | SSL/TLS protocol. | TLS | STRING | Yes | No | 
| tls.store.type | TLS store type. | JKS | STRING | Yes | No | 
| ssl.configurations | SSL/TSL configurations in format "'<key>:<value>','<key>:<value>'". Some supported parameters:  - SSL/TLS protocols:'sslEnabledProtocols:TLSv1.1,TLSv1.2'- List of ciphers:'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256'- Enable session creation:'client.enable.session.creation:true'- Supported server names:'server.suported.server.names:server'- Add HTTP SNIMatcher:'server.supported.snimatchers:SNIMatcher' | - | STRING | Yes | No | 
| request.size.validation.configurations | Configurations to validate the HTTP request size. Expected format "'<key>:<value>','<key>:<value>'". Some supported configurations :  - Enable request size validation:'request.size.validation:true'If request size is validated  - Maximum request size:'request.size.validation.maximum.value:2048'- Response status code when request size validation fails:'request.size.validation.reject.status.code:401'- Response message when request size validation fails:'request.size.validation.reject.message:Message is bigger than the valid size'- Response Content-Type when request size validation fails:'request.size.validation.reject.message.content.type:plain/text' | - | STRING | Yes | No | 
| header.validation.configurations | Configurations to validate HTTP headers. Expected format "'<key>:<value>','<key>:<value>'". Some supported configurations :  - Enable header size validation:'header.size.validation:true'If header size is validated  - Maximum length of initial line:'header.validation.maximum.request.line:4096'- Maximum length of all headers:'header.validation.maximum.size:8192'- Maximum length of the content or each chunk:'header.validation.maximum.chunk.size:8192'- Response status code when header validation fails:'header.validation.reject.status.code:401'- Response message when header validation fails:'header.validation.reject.message:Message header is bigger than the valid size'- Response Content-Type when header validation fails:'header.validation.reject.message.content.type:plain/text' | - | STRING | Yes | No | 
| server.bootstrap.configurations | Server bootstrap configurations in format "'<key>:<value>','<key>:<value>'". Some supported configurations :  - Server connect timeout in millis:'server.bootstrap.connect.timeout:15000'- Server socket timeout in seconds:'server.bootstrap.socket.timeout:15'- Enable TCP no delay:'server.bootstrap.nodelay:true'- Enable server keep alive:'server.bootstrap.keepalive:true'- Send buffer size:'server.bootstrap.sendbuffersize:1048576'- Receive buffer size:'server.bootstrap.recievebuffersize:1048576'- Number of connections queued:'server.bootstrap.socket.backlog:100' | - | STRING | Yes | No | 
| trace.log.enabled | Enable trace log for traffic monitoring. | false | BOOL | Yes | No | 
System Parameters
| Name | Description | Default Value | Possible Parameters | 
|---|---|---|---|
| serverBootstrapBossGroupSize | Number of boss threads to accept incoming connections. | Number of available processors | Any positive integer | 
| serverBootstrapWorkerGroupSize | Number of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels. | (Number of available processors) * 2 | Any positive integer | 
| serverBootstrapClientGroupSize | Number of client threads to perform non-blocking read and write to one or more channels. | (Number of available processors) * 2 | Any positive integer | 
| defaultHost | The default host of the transport. | 0.0.0.0 | Any valid host | 
| defaultScheme | The default protocol. | http | http https | 
| defaultHttpPort | The default HTTP port when default scheme is http. | 8280 | Any valid port | 
| defaultHttpsPort | The default HTTPS port when default scheme is https. | 8243 | Any valid port | 
| keyStoreLocation | The default keystore file path. | `\${carbon.home}/resources/security/gdncarbon.jks` | Path to `.jks` file | 
| keyStorePassword | The default keystore password. | gdncarbon | Keystore password as string | 
Example
@App:name('Sample-HTTP-Source')
@App:description("This application shows how to receive POST requests via Stream Workers API.")
@App:qlVersion('2')
CREATE SOURCE AddStream WITH (type='http-service', source.id='adder', map.type='json', map.attributes.messageId='trp:messageId', map.attributes.value1='$.event.value1', map.attributes.value2='$.event.value2') (messageId string, value1 long, value2 long);
CREATE SINK ResultStream WITH (type='http-service-response', source.id='adder', message.id='{{messageId}}', map.type = 'json') (messageId string, results long);
@info(name = 'query1')
insert into ResultStream
select messageId, value1 + value2 as results
from AddStream;
Above sample listens events for JSON messages on the format:
{
  "event": {
    "value1": 3,
    "value2": 4
  }
}
Map the vents into AddStream, process the events through query query1, and sends the results produced on ResultStream via http-service-response sink on the message format:
{
  "event": {
    "results": 7
  }
}