HTTP and RPC calls with Jaegers

HTTP Calls


Client Files

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from flask import Flask, request
from flask import jsonify
from flask_cors import CORS
import requests
import json

import registrator

from JaegerInjectAPI import TestInject

app = Flask(__name__)
cors = CORS(app, resources={r"/*": {"origins": "*"}})

# Connection to models
JaegerInjectAPI_Provider = TestInject()

# Distributed Tracing
import jaegerTracing
tracer = jaegerTracing.getTracerInstance()
from flask_opentracing import FlaskTracing
tracing = FlaskTracing(tracer, True, app)


@app.route('/inject', methods=['GET'])
def testjaegerInject():
    try:
        application_status = JaegerInjectAPI_Provider.injectSpan()
        return str(application_status)
    except Exception as ex:
        return jsonify(str(ex)), 500





if __name__ == '__main__':
    app.run(host='0.0.0.0')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import json
from flask import jsonify
import requests

# Distributed Tracing
from opentracing_instrumentation.request_context import get_current_span, span_in_context
import jaegerTracing
tracer = jaegerTracing.getTracerInstance()
from opentracing.propagation import Format


class TestInject():

    def injectSpan(self):
        with tracer.start_active_span('injectSpan') as scope:
            scope.span.set_tag("args", "I hope you will see me in server side")

            # Distributed Tracing with Jaeger
            carrier = {"Content-Type": "application/json"}
            tracer.inject(
                span_context=scope.span.context,
                format=Format.TEXT_MAP,
                carrier=carrier)

            url = 'http://jaegerextract:5000/extract'
            # url = 'http://localhost:6570/company/searchByName'
            data = json.dumps({'keyword':"Mohsin"})
            response = requests.get(url, data = data, headers = carrier)
            return response

Server Files

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from flask import Flask, request
from flask import jsonify
from flask_cors import CORS
import requests
import json

import registrator

from JaegerExtractAPI import TestExtract

app = Flask(__name__)
cors = CORS(app, resources={r"/*": {"origins": "*"}})

# Connection to models
TestExtract_Provider = TestExtract()

# Distributed Tracing
import jaegerTracing
tracer = jaegerTracing.getTracerInstance()
from flask_opentracing import FlaskTracing
tracing = FlaskTracing(tracer, True, app)


@app.route('/extract', methods=['GET'])
def testjaegerExtract():
    try:
        application_status = TestExtract_Provider.extractSpan()
        return "server1"
    except Exception as ex:
        return jsonify(str(ex)), 500


if __name__ == '__main__':
    app.run(host='0.0.0.0')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import pymongo
from datetime import datetime
import pika
import json
from flask import jsonify
from bson.objectid import ObjectId


# Distributed Tracing
from opentracing_instrumentation.request_context import get_current_span, span_in_context
import jaegerTracing
tracer = jaegerTracing.getTracerInstance()
from opentracing.propagation import Format



class TestExtract():


    def extractSpan(self):
        span_ctx = tracer.extract(format=Format.TEXT_MAP, carrier={})
        with tracer.start_active_span(operation_name='extractSpan', child_of=span_ctx) as scope:
            scope.span.set_tag('args', 'input values here')

        return "Server2

Useful Code Snippets


Client side

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Distributed Tracing with Jaeger
carrier = {"Content-Type": "application/json"}
tracer.inject(
    span_context=scope.span.context,
    format=Format.TEXT_MAP,
    carrier=carrier)

url = 'http://jaegerextract:5000/extract'
# url = 'http://localhost:6570/company/searchByName'
data = json.dumps({'keyword':"Mohsin"})
response = requests.get(url, data = data, headers = carrier)
return response

Server side

1
2
3
4
5
6
def extractSpan(self):
        span_ctx = tracer.extract(format=Format.TEXT_MAP, carrier={})
        with tracer.start_active_span(operation_name='extractSpan', child_of=span_ctx) as scope:
            scope.span.set_tag('args', 'input values here')

        return "Server2"

RPC via RabbitMQ

Client Call

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def save_user_created(self, user_id, content):
        try:
            with tracer.start_active_span('save_user_created') as scope:
                scope.span.set_tag('args', [user_id, content])

                # Distributed Tracing (Injecting Span)
                span = get_current_span()
                carrier = {}
                tracer.inject(span_context=span.context, format=Format.TEXT_MAP, carrier=carrier)

                _now = str(datetime.now())
                # publish notification for job applied
                message = {"user_id": user_id, "dated": _now, "data": content}
                message_str = json.dumps(message)
                print("sending body to save : " + message_str)
                # RabbitMQ Connection
                connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
                # connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
                channel = connection.channel()
                channel.basic_publish(
                    exchange='save_user_name_image',
                    routing_key='',
                    body=message_str,
                    properties=pika.BasicProperties(headers=carrier)
                    )
                connection.close()
                span.finish()
                # Return REST Reply
                return jsonify({"message": "User Name and Image Saved Mysql"}), 201
        except Exception as ex:
            scope.span.set_tag('Exception', ex)
            return jsonify(str(ex)), 500

Exchange-Consumer Call

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def callback(self, ch, method, properties, body):
        span_ctx = tracer.extract(format=Format.TEXT_MAP, carrier=properties.headers)
        with tracer.start_active_span('callback', child_of=span_ctx) as scope:
            data = json.loads(str(body.decode("utf-8")))
            print("RECEIVED DATA: " , data)
            innerData = data['data']
            if 'userImage' in innerData.keys():
                print("FIRST")
                insert_query = "INSERT INTO UserPersonalInfos (userId, legalName, image) VALUES (%s, %s, %s)"
                insert_data = (data['user_id'], data['data']['userName'], data['data']['userImage'])

            else :
                print("SECOND")
                insert_query = "INSERT INTO UserPersonalInfos (userId, legalName) VALUES (%s, %s)"
                insert_data = (data['user_id'], data['data']['userName'])

            cursor = self.mysqlConnection.cursor()
            scope.span.set_tag("query", insert_query)
            scope.span.set_tag("queryData", insert_data)
            cursor.execute(insert_query, insert_data)
            self.mysqlConnection.commit()
            print("Personal Information Inserted Successfully."