Viewing File: /home/ubuntu/efiexchange-node-base/node_modules/@grpc/grpc-js/src/channel.ts

/*
 * Copyright 2019 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

import {
  Deadline,
  Call,
  Http2CallStream,
  CallStreamOptions,
  StatusObject,
} from './call-stream';
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import { ResolvingLoadBalancer } from './resolving-load-balancer';
import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, PickResultType } from './picker';
import { Metadata } from './metadata';
import { Status, LogVerbosity, Propagate } from './constants';
import { FilterStackFactory } from './filter-stack';
import { CallCredentialsFilterFactory } from './call-credentials-filter';
import { DeadlineFilterFactory } from './deadline-filter';
import { CompressionFilterFactory } from './compression-filter';
import {
  CallConfig,
  ConfigSelector,
  getDefaultAuthority,
  mapUriDefaultScheme,
} from './resolver';
import { trace, log } from './logging';
import { SubchannelAddress } from './subchannel-address';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
import { mapProxyName } from './http_proxy';
import { GrpcUri, parseUri, uriToString } from './uri-parser';
import { ServerSurfaceCall } from './server-call';
import { Filter } from './filter';

import { ConnectivityState } from './connectivity-state';
import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz';
import { Subchannel } from './subchannel';

/**
 * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
 */
const MAX_TIMEOUT_TIME = 2147483647;

let nextCallNumber = 0;

function getNewCallNumber(): number {
  const callNumber = nextCallNumber;
  nextCallNumber += 1;
  if (nextCallNumber >= Number.MAX_SAFE_INTEGER) {
    nextCallNumber = 0;
  }
  return callNumber;
}

/**
 * An interface that represents a communication channel to a server specified
 * by a given address.
 */
export interface Channel {
  /**
   * Close the channel. This has the same functionality as the existing
   * grpc.Client.prototype.close
   */
  close(): void;
  /**
   * Return the target that this channel connects to
   */
  getTarget(): string;
  /**
   * Get the channel's current connectivity state. This method is here mainly
   * because it is in the existing internal Channel class, and there isn't
   * another good place to put it.
   * @param tryToConnect If true, the channel will start connecting if it is
   *     idle. Otherwise, idle channels will only start connecting when a
   *     call starts.
   */
  getConnectivityState(tryToConnect: boolean): ConnectivityState;
  /**
   * Watch for connectivity state changes. This is also here mainly because
   * it is in the existing external Channel class.
   * @param currentState The state to watch for transitions from. This should
   *     always be populated by calling getConnectivityState immediately
   *     before.
   * @param deadline A deadline for waiting for a state change
   * @param callback Called with no error when a state change, or with an
   *     error if the deadline passes without a state change.
   */
  watchConnectivityState(
    currentState: ConnectivityState,
    deadline: Date | number,
    callback: (error?: Error) => void
  ): void;
  /**
   * Get the channelz reference object for this channel. A request to the
   * channelz service for the id in this object will provide information
   * about this channel.
   */
  getChannelzRef(): ChannelRef;
  /**
   * Create a call object. Call is an opaque type that is used by the Client
   * class. This function is called by the gRPC library when starting a
   * request. Implementers should return an instance of Call that is returned
   * from calling createCall on an instance of the provided Channel class.
   * @param method The full method string to request.
   * @param deadline The call deadline
   * @param host A host string override for making the request
   * @param parentCall A server call to propagate some information from
   * @param propagateFlags A bitwise combination of elements of grpc.propagate
   *     that indicates what information to propagate from parentCall.
   */
  createCall(
    method: string,
    deadline: Deadline,
    host: string | null | undefined,
    parentCall: ServerSurfaceCall | null,
    propagateFlags: number | null | undefined
  ): Call;
}

interface ConnectivityStateWatcher {
  currentState: ConnectivityState;
  timer: NodeJS.Timeout | null;
  callback: (error?: Error) => void;
}

export class ChannelImplementation implements Channel {
  private resolvingLoadBalancer: ResolvingLoadBalancer;
  private subchannelPool: SubchannelPool;
  private connectivityState: ConnectivityState = ConnectivityState.IDLE;
  private currentPicker: Picker = new UnavailablePicker();
  /**
   * Calls queued up to get a call config. Should only be populated before the
   * first time the resolver returns a result, which includes the ConfigSelector.
   */
  private configSelectionQueue: Array<{
    callStream: Http2CallStream;
    callMetadata: Metadata;
  }> = [];
  private pickQueue: Array<{
    callStream: Http2CallStream;
    callMetadata: Metadata;
    callConfig: CallConfig;
    dynamicFilters: Filter[];
  }> = [];
  private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
  private defaultAuthority: string;
  private filterStackFactory: FilterStackFactory;
  private target: GrpcUri;
  /**
   * This timer does not do anything on its own. Its purpose is to hold the
   * event loop open while there are any pending calls for the channel that
   * have not yet been assigned to specific subchannels. In other words,
   * the invariant is that callRefTimer is reffed if and only if pickQueue
   * is non-empty.
   */
  private callRefTimer: NodeJS.Timer;
  private configSelector: ConfigSelector | null = null;
  /**
   * This is the error from the name resolver if it failed most recently. It
   * is only used to end calls that start while there is no config selector
   * and the name resolver is in backoff, so it should be nulled if
   * configSelector becomes set or the channel state becomes anything other
   * than TRANSIENT_FAILURE.
   */
  private currentResolutionError: StatusObject | null = null;

  // Channelz info
  private readonly channelzEnabled: boolean = true;
  private originalTarget: string;
  private channelzRef: ChannelRef;
  private channelzTrace: ChannelzTrace;
  private callTracker = new ChannelzCallTracker();
  private childrenTracker = new ChannelzChildrenTracker();

  constructor(
    target: string,
    private readonly credentials: ChannelCredentials,
    private readonly options: ChannelOptions
  ) {
    if (typeof target !== 'string') {
      throw new TypeError('Channel target must be a string');
    }
    if (!(credentials instanceof ChannelCredentials)) {
      throw new TypeError(
        'Channel credentials must be a ChannelCredentials object'
      );
    }
    if (options) {
      if (typeof options !== 'object') {
        throw new TypeError('Channel options must be an object');
      }
    }
    this.originalTarget = target;
    const originalTargetUri = parseUri(target);
    if (originalTargetUri === null) {
      throw new Error(`Could not parse target name "${target}"`);
    }
    /* This ensures that the target has a scheme that is registered with the
     * resolver */
    const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri);
    if (defaultSchemeMapResult === null) {
      throw new Error(
        `Could not find a default scheme for target name "${target}"`
      );
    }

    this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME);
    this.callRefTimer.unref?.();

    if (this.options['grpc.enable_channelz'] === 0) {
      this.channelzEnabled = false;
    }

    this.channelzTrace = new ChannelzTrace();
    this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo(), this.channelzEnabled);
    if (this.channelzEnabled) {
      this.channelzTrace.addTrace('CT_INFO', 'Channel created');
    }

    if (this.options['grpc.default_authority']) {
      this.defaultAuthority = this.options['grpc.default_authority'] as string;
    } else {
      this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult);
    }
    const proxyMapResult = mapProxyName(defaultSchemeMapResult, options);
    this.target = proxyMapResult.target;
    this.options = Object.assign({}, this.options, proxyMapResult.extraOptions);

    /* The global boolean parameter to getSubchannelPool has the inverse meaning to what
     * the grpc.use_local_subchannel_pool channel option means. */
    this.subchannelPool = getSubchannelPool(
      (options['grpc.use_local_subchannel_pool'] ?? 0) === 0
    );
    const channelControlHelper: ChannelControlHelper = {
      createSubchannel: (
        subchannelAddress: SubchannelAddress,
        subchannelArgs: ChannelOptions
      ) => {
        const subchannel = this.subchannelPool.getOrCreateSubchannel(
          this.target,
          subchannelAddress,
          Object.assign({}, this.options, subchannelArgs),
          this.credentials
        );
        if (this.channelzEnabled) {
          this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
        }
        return subchannel;
      },
      updateState: (connectivityState: ConnectivityState, picker: Picker) => {
        this.currentPicker = picker;
        const queueCopy = this.pickQueue.slice();
        this.pickQueue = [];
        this.callRefTimerUnref();
        for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) {
          this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
        }
        this.updateState(connectivityState);
      },
      requestReresolution: () => {
        // This should never be called.
        throw new Error(
          'Resolving load balancer should never call requestReresolution'
        );
      },
      addChannelzChild: (child: ChannelRef | SubchannelRef) => {
        if (this.channelzEnabled) {
          this.childrenTracker.refChild(child);
        }
      },
      removeChannelzChild: (child: ChannelRef | SubchannelRef) => {
        if (this.channelzEnabled) {
          this.childrenTracker.unrefChild(child);
        }
      }
    };
    this.resolvingLoadBalancer = new ResolvingLoadBalancer(
      this.target,
      channelControlHelper,
      options,
      (configSelector) => {
        if (this.channelzEnabled) {
          this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
        }
        this.configSelector = configSelector;
        this.currentResolutionError = null;
        /* We process the queue asynchronously to ensure that the corresponding
         * load balancer update has completed. */
        process.nextTick(() => {
          const localQueue = this.configSelectionQueue;
          this.configSelectionQueue = [];
          this.callRefTimerUnref();
          for (const { callStream, callMetadata } of localQueue) {
            this.tryGetConfig(callStream, callMetadata);
          }
          this.configSelectionQueue = [];
        });
      },
      (status) => {
        if (this.channelzEnabled) {
          this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"');
        }
        if (this.configSelectionQueue.length > 0) {
          this.trace('Name resolution failed with calls queued for config selection');
        }
        if (this.configSelector === null) {
          this.currentResolutionError = status;
        }
        const localQueue = this.configSelectionQueue;
        this.configSelectionQueue = [];
        this.callRefTimerUnref();
        for (const { callStream, callMetadata } of localQueue) {
          if (callMetadata.getOptions().waitForReady) {
            this.callRefTimerRef();
            this.configSelectionQueue.push({ callStream, callMetadata });
          } else {
            callStream.cancelWithStatus(status.code, status.details);
          }
        }
      }
    );
    this.filterStackFactory = new FilterStackFactory([
      new CallCredentialsFilterFactory(this),
      new DeadlineFilterFactory(this),
      new MaxMessageSizeFilterFactory(this.options),
      new CompressionFilterFactory(this, this.options),
    ]);
    this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
    const error = new Error();
    trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1));
  }

  private getChannelzInfo(): ChannelInfo {
    return {
      target: this.originalTarget,
      state: this.connectivityState,
      trace: this.channelzTrace,
      callTracker: this.callTracker,
      children: this.childrenTracker.getChildLists()
    };
  }

  private trace(text: string, verbosityOverride?: LogVerbosity) {
    trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text);
  }

  private callRefTimerRef() {
    // If the hasRef function does not exist, always run the code
    if (!this.callRefTimer.hasRef?.()) {
      this.trace(
        'callRefTimer.ref | configSelectionQueue.length=' +
          this.configSelectionQueue.length +
          ' pickQueue.length=' +
          this.pickQueue.length
      );
      this.callRefTimer.ref?.();
    }
  }

  private callRefTimerUnref() {
    // If the hasRef function does not exist, always run the code
    if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) {
      this.trace(
        'callRefTimer.unref | configSelectionQueue.length=' +
          this.configSelectionQueue.length +
          ' pickQueue.length=' +
          this.pickQueue.length
      );
      this.callRefTimer.unref?.();
    }
  }

  private pushPick(
    callStream: Http2CallStream,
    callMetadata: Metadata,
    callConfig: CallConfig,
    dynamicFilters: Filter[]
  ) {
    this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters });
    this.callRefTimerRef();
  }

  /**
   * Check the picker output for the given call and corresponding metadata,
   * and take any relevant actions. Should not be called while iterating
   * over pickQueue.
   * @param callStream
   * @param callMetadata
   */
  private tryPick(
    callStream: Http2CallStream,
    callMetadata: Metadata,
    callConfig: CallConfig,
    dynamicFilters: Filter[]
  ) {
    const pickResult = this.currentPicker.pick({
      metadata: callMetadata,
      extraPickInfo: callConfig.pickInformation,
    });
    const subchannelString = pickResult.subchannel ? 
      '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : 
      '' + pickResult.subchannel; 
    this.trace(
      'Pick result for call [' + 
        callStream.getCallNumber() + 
        ']: ' +
        PickResultType[pickResult.pickResultType] +
        ' subchannel: ' +
        subchannelString +
        ' status: ' +
        pickResult.status?.code +
        ' ' +
        pickResult.status?.details
    );
    switch (pickResult.pickResultType) {
      case PickResultType.COMPLETE:
        if (pickResult.subchannel === null) {
          callStream.cancelWithStatus(
            Status.UNAVAILABLE,
            'Request dropped by load balancing policy'
          );
          // End the call with an error
        } else {
          /* If the subchannel is not in the READY state, that indicates a bug
           * somewhere in the load balancer or picker. So, we log an error and
           * queue the pick to be tried again later. */
          if (
            pickResult.subchannel!.getConnectivityState() !==
            ConnectivityState.READY
          ) {
            log(
              LogVerbosity.ERROR,
              'Error: COMPLETE pick result subchannel ' +
                subchannelString +
                ' has state ' +
                ConnectivityState[pickResult.subchannel!.getConnectivityState()]
            );
            this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
            break;
          }
          /* We need to clone the callMetadata here because the transparent
           * retry code in the promise resolution handler use the same
           * callMetadata object, so it needs to stay unmodified */
          callStream.filterStack
            .sendMetadata(Promise.resolve(callMetadata.clone()))
            .then(
              (finalMetadata) => {
                const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState();
                if (subchannelState === ConnectivityState.READY) {
                  try {
                    const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream));
                    pickResult.subchannel?.getRealSubchannel().startCallStream(
                      finalMetadata,
                      callStream,
                      [...dynamicFilters, ...pickExtraFilters]
                    );
                    /* If we reach this point, the call stream has started
                     * successfully */
                    callConfig.onCommitted?.();
                    pickResult.onCallStarted?.();
                  } catch (error) {
                    const errorCode = (error as NodeJS.ErrnoException).code;
                    if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' ||
                        errorCode === 'ERR_HTTP2_INVALID_SESSION'
                    ) {
                      /* An error here indicates that something went wrong with
                       * the picked subchannel's http2 stream right before we
                       * tried to start the stream. We are handling a promise
                       * result here, so this is asynchronous with respect to the
                       * original tryPick call, so calling it again is not
                       * recursive. We call tryPick immediately instead of
                       * queueing this pick again because handling the queue is
                       * triggered by state changes, and we want to immediately
                       * check if the state has already changed since the
                       * previous tryPick call. We do this instead of cancelling
                       * the stream because the correct behavior may be
                       * re-queueing instead, based on the logic in the rest of
                       * tryPick */
                      this.trace(
                        'Failed to start call on picked subchannel ' +
                          subchannelString +
                          ' with error ' +
                          (error as Error).message +
                          '. Retrying pick',
                          LogVerbosity.INFO
                      );
                      this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
                    } else {
                      this.trace(
                        'Failed to start call on picked subchanel ' +
                          subchannelString +
                          ' with error ' +
                          (error as Error).message +
                          '. Ending call',
                          LogVerbosity.INFO
                      );
                      callStream.cancelWithStatus(
                        Status.INTERNAL,
                        `Failed to start HTTP/2 stream with error: ${
                          (error as Error).message
                        }`
                      );
                    }
                  }
                } else {
                  /* The logic for doing this here is the same as in the catch
                   * block above */
                  this.trace(
                    'Picked subchannel ' +
                      subchannelString +
                      ' has state ' +
                      ConnectivityState[subchannelState] +
                      ' after metadata filters. Retrying pick',
                      LogVerbosity.INFO
                  );
                  this.tryPick(callStream, callMetadata, callConfig, dynamicFilters);
                }
              },
              (error: Error & { code: number }) => {
                // We assume the error code isn't 0 (Status.OK)
                callStream.cancelWithStatus(
                  typeof error.code === 'number' ? error.code : Status.UNKNOWN,
                  `Getting metadata from plugin failed with error: ${error.message}`
                );
              }
            );
        }
        break;
      case PickResultType.QUEUE:
        this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
        break;
      case PickResultType.TRANSIENT_FAILURE:
        if (callMetadata.getOptions().waitForReady) {
          this.pushPick(callStream, callMetadata, callConfig, dynamicFilters);
        } else {
          callStream.cancelWithStatus(
            pickResult.status!.code,
            pickResult.status!.details
          );
        }
        break;
      case PickResultType.DROP:
        callStream.cancelWithStatus(
          pickResult.status!.code,
          pickResult.status!.details
        );
        break;
      default:
        throw new Error(
          `Invalid state: unknown pickResultType ${pickResult.pickResultType}`
        );
    }
  }

  private removeConnectivityStateWatcher(
    watcherObject: ConnectivityStateWatcher
  ) {
    const watcherIndex = this.connectivityStateWatchers.findIndex(
      (value) => value === watcherObject
    );
    if (watcherIndex >= 0) {
      this.connectivityStateWatchers.splice(watcherIndex, 1);
    }
  }

  private updateState(newState: ConnectivityState): void {
    trace(
      LogVerbosity.DEBUG,
      'connectivity_state',
      '(' + this.channelzRef.id + ') ' + 
        uriToString(this.target) +
        ' ' +
        ConnectivityState[this.connectivityState] +
        ' -> ' +
        ConnectivityState[newState]
    );
    if (this.channelzEnabled) {
      this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
    }
    this.connectivityState = newState;
    const watchersCopy = this.connectivityStateWatchers.slice();
    for (const watcherObject of watchersCopy) {
      if (newState !== watcherObject.currentState) {
        if (watcherObject.timer) {
          clearTimeout(watcherObject.timer);
        }
        this.removeConnectivityStateWatcher(watcherObject);
        watcherObject.callback();
      }
    }
    if (newState !== ConnectivityState.TRANSIENT_FAILURE) {
      this.currentResolutionError = null;
    }
  }

  private tryGetConfig(stream: Http2CallStream, metadata: Metadata) {
    if (stream.getStatus() !== null) {
      /* If the stream has a status, it has already finished and we don't need
       * to take any more actions on it. */
      return;
    }
    if (this.configSelector === null) {
      /* This branch will only be taken at the beginning of the channel's life,
       * before the resolver ever returns a result. So, the
       * ResolvingLoadBalancer may be idle and if so it needs to be kicked
       * because it now has a pending request. */
      this.resolvingLoadBalancer.exitIdle();
      if (this.currentResolutionError && !metadata.getOptions().waitForReady) {
        stream.cancelWithStatus(this.currentResolutionError.code, this.currentResolutionError.details);
      } else {
        this.configSelectionQueue.push({
          callStream: stream,
          callMetadata: metadata,
        });
        this.callRefTimerRef();
      }
    } else {
      const callConfig = this.configSelector(stream.getMethod(), metadata);
      if (callConfig.status === Status.OK) {
        if (callConfig.methodConfig.timeout) {
          const deadline = new Date();
          deadline.setSeconds(
            deadline.getSeconds() + callConfig.methodConfig.timeout.seconds
          );
          deadline.setMilliseconds(
            deadline.getMilliseconds() +
              callConfig.methodConfig.timeout.nanos / 1_000_000
          );
          stream.setConfigDeadline(deadline);
          // Refreshing the filters makes the deadline filter pick up the new deadline
          stream.filterStack.refresh();
        }
        if (callConfig.dynamicFilterFactories.length > 0) {
          /* These dynamicFilters are the mechanism for implementing gRFC A39:
           * https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md
           * We run them here instead of with the rest of the filters because
           * that spec says "the xDS HTTP filters will run in between name 
           * resolution and load balancing".
           * 
           * We use the filter stack here to simplify the multi-filter async
           * waterfall logic, but we pass along the underlying list of filters
           * to avoid having nested filter stacks when combining it with the
           * original filter stack. We do not pass along the original filter
           * factory list because these filters may need to persist data
           * between sending headers and other operations. */
          const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories);
          const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream);
          dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => {
            this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters());
          });
        } else {
          this.tryPick(stream, metadata, callConfig, []);
        }
      } else {
        stream.cancelWithStatus(
          callConfig.status,
          'Failed to route call to method ' + stream.getMethod()
        );
      }
    }
  }

  _startCallStream(stream: Http2CallStream, metadata: Metadata) {
    this.tryGetConfig(stream, metadata.clone());
  }

  close() {
    this.resolvingLoadBalancer.destroy();
    this.updateState(ConnectivityState.SHUTDOWN);
    clearInterval(this.callRefTimer);
    if (this.channelzEnabled) {
      unregisterChannelzRef(this.channelzRef);
    }

    this.subchannelPool.unrefUnusedSubchannels();
  }

  getTarget() {
    return uriToString(this.target);
  }

  getConnectivityState(tryToConnect: boolean) {
    const connectivityState = this.connectivityState;
    if (tryToConnect) {
      this.resolvingLoadBalancer.exitIdle();
    }
    return connectivityState;
  }

  watchConnectivityState(
    currentState: ConnectivityState,
    deadline: Date | number,
    callback: (error?: Error) => void
  ): void {
    if (this.connectivityState === ConnectivityState.SHUTDOWN) {
      throw new Error('Channel has been shut down');
    }
    let timer = null;
    if (deadline !== Infinity) {
      const deadlineDate: Date =
        deadline instanceof Date ? deadline : new Date(deadline);
      const now = new Date();
      if (deadline === -Infinity || deadlineDate <= now) {
        process.nextTick(
          callback,
          new Error('Deadline passed without connectivity state change')
        );
        return;
      }
      timer = setTimeout(() => {
        this.removeConnectivityStateWatcher(watcherObject);
        callback(
          new Error('Deadline passed without connectivity state change')
        );
      }, deadlineDate.getTime() - now.getTime());
    }
    const watcherObject = {
      currentState,
      callback,
      timer,
    };
    this.connectivityStateWatchers.push(watcherObject);
  }

  /**
   * Get the channelz reference object for this channel. The returned value is
   * garbage if channelz is disabled for this channel.
   * @returns 
   */
  getChannelzRef() {
    return this.channelzRef;
  }

  createCall(
    method: string,
    deadline: Deadline,
    host: string | null | undefined,
    parentCall: ServerSurfaceCall | null,
    propagateFlags: number | null | undefined
  ): Call {
    if (typeof method !== 'string') {
      throw new TypeError('Channel#createCall: method must be a string');
    }
    if (!(typeof deadline === 'number' || deadline instanceof Date)) {
      throw new TypeError(
        'Channel#createCall: deadline must be a number or Date'
      );
    }
    if (this.connectivityState === ConnectivityState.SHUTDOWN) {
      throw new Error('Channel has been shut down');
    }
    const callNumber = getNewCallNumber();
    this.trace(
      'createCall [' +
        callNumber +
        '] method="' +
        method +
        '", deadline=' +
        deadline
    );
    const finalOptions: CallStreamOptions = {
      deadline: deadline,
      flags: propagateFlags ?? Propagate.DEFAULTS,
      host: host ?? this.defaultAuthority,
      parentCall: parentCall,
    };
    const stream: Http2CallStream = new Http2CallStream(
      method,
      this,
      finalOptions,
      this.filterStackFactory,
      this.credentials._getCallCredentials(),
      callNumber
    );
    if (this.channelzEnabled) {
      this.callTracker.addCallStarted();
      stream.addStatusWatcher(status => {
        if (status.code === Status.OK) {
          this.callTracker.addCallSucceeded();
        } else {
          this.callTracker.addCallFailed();
        }
      });
    }
    return stream;
  }
}
Back to Directory File Manager