Create a Custom Data Source
Data Source
Any kind of entity which provides read and write mechanism for data is considered a datasource. For example, an API, a SQL or NoSQL datastore which includes RDBMS or mongodb,postgresql, key value stores, document stores etc. The settings for each datasource lies in src/datasources directory.
Steps to create Custom Datasource
let's use kafka as an example of an datasource :
Project structure
.
├── src
├── datasources
│ ├── types
│ | └── kafka.ts
| |
│ └── kafka.yaml
│
├── events
| |
│ ├── kafka_publish_event.yaml
| |
| └── kafka_consumer_event.yaml
├── eventsources
│ ├── types
│ | └── kafka.ts
| |
│ └── kafka.yaml
|
└── functions
|
├── kafka-publish.yaml
|
└── kafka-consume.yaml
kafka config ( src/datasources/kafka.yaml )
type: Kafka
clientId: "kafka_proj"
brokers: ["kafka:9092"]
initializing client and execution ( src/datasources/types/Kafka.ts ) :
import { GSContext, GSDataSource, PlainObject } from "@godspeedsystems/core";
import { Kafka } from "kafkajs";
export default class DataSource extends GSDataSource {
protected async initClient(): Promise<PlainObject> {
const kafka = new Kafka({
clientId: this.config.clientId,
brokers: this.config.brokers,
});
return kafka;
}
async execute(ctx: GSContext, args: PlainObject): Promise<any> {
try {
const {
topic,
message,
meta: { fnNameInWorkflow },
} = args;
let method = fnNameInWorkflow.split(".")[2];
if (this.client) {
if (method === "producer") {
const producer = this.client.producer();
await producer.connect();
let result = await producer.send({
topic: topic,
messages: [{ value: message }],
});
return result;
} else {
return "Invalid method";
}
}
} catch (error) {
throw error;
}
}
}
Example Event ( src/events/kafka_publish_event.yaml ) :
'http.post./kafka-pub':
fn: kafka-publish
body:
content:
application/json:
schema:
type: object
properties:
message:
type: string
required: ['message']
responses:
200:
content:
application/json:
schema:
type: object
properties:
name:
type: string
Function Example ( src/functions/kafka-publish.yaml ) :
id: kafka-publish
summary: kafka publish message
tasks:
- id: publish
fn: datasource.kafka.producer
args:
topic: "publish-producer1"
message: <% inputs.body.message%>
Inside the
datasources
directory, create aYAML
file with a specific name. In this YAML file, ensure you specify atype
field, and there must be a correspondingTypeScript
file in thetypes
directory that shares the same name as thetype
you defined.In your TypeScript file, use an import statement to bring in
GSDataSource
from the@godspeedsystems/core
package. Then, create a class that inherits fromGSDataSource
.
.
├── src
├── datasources
│ ├── types
│ | └── custom_datasource.ts
| |
│ └── custom_datasource.yaml
│
├── events
|
|
|
├── eventsources
│
|
└── functions
Afterward, you can access the methods provided by
GSDataSource
. Initialize your client by calling theinitClient()
function.Once your client is initialized, you can execute its methods using the
execute
function.